diff options
Diffstat (limited to 'b3sum/src/main.rs')
| -rw-r--r-- | b3sum/src/main.rs | 85 |
1 files changed, 72 insertions, 13 deletions
diff --git a/b3sum/src/main.rs b/b3sum/src/main.rs index 3fc074c..5cca729 100644 --- a/b3sum/src/main.rs +++ b/b3sum/src/main.rs @@ -1,5 +1,6 @@ use anyhow::{bail, Context, Result}; use clap::{App, Arg}; +use once_cell::sync::Lazy; use std::cmp; use std::convert::TryInto; use std::fs::File; @@ -11,6 +12,71 @@ const KEYED_ARG: &str = "keyed"; const DERIVE_KEY_ARG: &str = "derive-key"; const NO_NAMES_ARG: &str = "no-names"; +const BUF_SIZE: usize = 1 << 20; + +struct BufChannelPair { + sender: crossbeam::channel::Sender<Vec<u8>>, + receiver: crossbeam::channel::Receiver<Vec<u8>>, +} + +static BUF_CHANNEL_PAIR: Lazy<BufChannelPair> = Lazy::new(|| { + let (sender, receiver) = crossbeam::channel::unbounded(); + BufChannelPair { sender, receiver } +}); + +type BgReadRequest = ( + Box<dyn Read + Send>, + crossbeam::channel::Sender<std::io::Result<Vec<u8>>>, +); + +static BG_READER_THREAD: Lazy<crossbeam::channel::Sender<BgReadRequest>> = Lazy::new(|| { + let (sender, receiver) = crossbeam::channel::unbounded::<BgReadRequest>(); + std::thread::spawn(move || { + for request in receiver { + let (mut reader, sender) = request; + loop { + let mut buf = if let Ok(mut buf) = BUF_CHANNEL_PAIR.receiver.try_recv() { + buf.clear(); + buf + } else { + Vec::with_capacity(BUF_SIZE) + }; + let read_result = reader.by_ref().take(BUF_SIZE as u64).read_to_end(&mut buf); + match read_result { + Ok(n) => { + sender.send(Ok(buf)).unwrap(); + if n == 0 { + break; + } + } + Err(e) => { + BUF_CHANNEL_PAIR.sender.send(buf).unwrap(); + sender.send(Err(e)).unwrap(); + break; + } + } + } + } + }); + sender +}); + +fn bg_read<F>(reader: impl Read + Send + 'static, mut f: F) -> std::io::Result<u64> +where + F: FnMut(&[u8]) -> std::io::Result<()>, +{ + let (sender, receiver) = crossbeam::channel::bounded(0); + BG_READER_THREAD.send((Box::new(reader), sender)).unwrap(); + let mut total = 0; + for read_result in receiver { + let vec = read_result?; + f(&vec)?; + total += vec.len() as u64; + BUF_CHANNEL_PAIR.sender.send(vec).unwrap(); + } + Ok(total) +} + fn clap_parse_argv() -> clap::ArgMatches<'static> { App::new("b3sum") .version(env!("CARGO_PKG_VERSION")) @@ -49,18 +115,13 @@ fn clap_parse_argv() -> clap::ArgMatches<'static> { // The slow path, for inputs that we can't memmap. fn hash_reader( base_hasher: &blake3::Hasher, - mut reader: impl Read, + reader: impl Read + Send + 'static, ) -> Result<blake3::OutputReader> { let mut hasher = base_hasher.clone(); - // TODO: This is a narrow copy, so it might not take advantage of SIMD or - // threads. With a larger buffer size, most of that performance can be - // recovered. However, this requires some platform-specific tuning, based - // on both the SIMD degree and the number of cores. A double-buffering - // strategy is also helpful, where a dedicated background thread reads - // input into one buffer while another thread is calling update() on a - // second buffer. Since this is the slow path anyway, do the simple thing - // for now. - std::io::copy(&mut reader, &mut hasher)?; + bg_read(reader, |input| { + hasher.update(input); + Ok(()) + })?; Ok(hasher.finalize_xof()) } @@ -187,9 +248,7 @@ fn main() -> Result<()> { } } } else { - let stdin = std::io::stdin(); - let stdin = stdin.lock(); - let output = hash_reader(&base_hasher, stdin)?; + let output = hash_reader(&base_hasher, std::io::stdin())?; write_output(output, len)?; println!(); } |
