aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--b3sum/Cargo.toml2
-rw-r--r--b3sum/src/main.rs85
2 files changed, 74 insertions, 13 deletions
diff --git a/b3sum/Cargo.toml b/b3sum/Cargo.toml
index c8e0c15..fcd1fa3 100644
--- a/b3sum/Cargo.toml
+++ b/b3sum/Cargo.toml
@@ -17,6 +17,8 @@ blake3 = { version = "0.1", path = ".." }
clap = { version = "2.33.0", default-features = false }
hex = "0.4.0"
memmap = { version = "0.7.0", optional = true }
+once_cell = "1.2.0"
+crossbeam = "0.7.3"
[dev-dependencies]
assert_cmd = "0.12.0"
diff --git a/b3sum/src/main.rs b/b3sum/src/main.rs
index 3fc074c..5cca729 100644
--- a/b3sum/src/main.rs
+++ b/b3sum/src/main.rs
@@ -1,5 +1,6 @@
use anyhow::{bail, Context, Result};
use clap::{App, Arg};
+use once_cell::sync::Lazy;
use std::cmp;
use std::convert::TryInto;
use std::fs::File;
@@ -11,6 +12,71 @@ const KEYED_ARG: &str = "keyed";
const DERIVE_KEY_ARG: &str = "derive-key";
const NO_NAMES_ARG: &str = "no-names";
+const BUF_SIZE: usize = 1 << 20;
+
+struct BufChannelPair {
+ sender: crossbeam::channel::Sender<Vec<u8>>,
+ receiver: crossbeam::channel::Receiver<Vec<u8>>,
+}
+
+static BUF_CHANNEL_PAIR: Lazy<BufChannelPair> = Lazy::new(|| {
+ let (sender, receiver) = crossbeam::channel::unbounded();
+ BufChannelPair { sender, receiver }
+});
+
+type BgReadRequest = (
+ Box<dyn Read + Send>,
+ crossbeam::channel::Sender<std::io::Result<Vec<u8>>>,
+);
+
+static BG_READER_THREAD: Lazy<crossbeam::channel::Sender<BgReadRequest>> = Lazy::new(|| {
+ let (sender, receiver) = crossbeam::channel::unbounded::<BgReadRequest>();
+ std::thread::spawn(move || {
+ for request in receiver {
+ let (mut reader, sender) = request;
+ loop {
+ let mut buf = if let Ok(mut buf) = BUF_CHANNEL_PAIR.receiver.try_recv() {
+ buf.clear();
+ buf
+ } else {
+ Vec::with_capacity(BUF_SIZE)
+ };
+ let read_result = reader.by_ref().take(BUF_SIZE as u64).read_to_end(&mut buf);
+ match read_result {
+ Ok(n) => {
+ sender.send(Ok(buf)).unwrap();
+ if n == 0 {
+ break;
+ }
+ }
+ Err(e) => {
+ BUF_CHANNEL_PAIR.sender.send(buf).unwrap();
+ sender.send(Err(e)).unwrap();
+ break;
+ }
+ }
+ }
+ }
+ });
+ sender
+});
+
+fn bg_read<F>(reader: impl Read + Send + 'static, mut f: F) -> std::io::Result<u64>
+where
+ F: FnMut(&[u8]) -> std::io::Result<()>,
+{
+ let (sender, receiver) = crossbeam::channel::bounded(0);
+ BG_READER_THREAD.send((Box::new(reader), sender)).unwrap();
+ let mut total = 0;
+ for read_result in receiver {
+ let vec = read_result?;
+ f(&vec)?;
+ total += vec.len() as u64;
+ BUF_CHANNEL_PAIR.sender.send(vec).unwrap();
+ }
+ Ok(total)
+}
+
fn clap_parse_argv() -> clap::ArgMatches<'static> {
App::new("b3sum")
.version(env!("CARGO_PKG_VERSION"))
@@ -49,18 +115,13 @@ fn clap_parse_argv() -> clap::ArgMatches<'static> {
// The slow path, for inputs that we can't memmap.
fn hash_reader(
base_hasher: &blake3::Hasher,
- mut reader: impl Read,
+ reader: impl Read + Send + 'static,
) -> Result<blake3::OutputReader> {
let mut hasher = base_hasher.clone();
- // TODO: This is a narrow copy, so it might not take advantage of SIMD or
- // threads. With a larger buffer size, most of that performance can be
- // recovered. However, this requires some platform-specific tuning, based
- // on both the SIMD degree and the number of cores. A double-buffering
- // strategy is also helpful, where a dedicated background thread reads
- // input into one buffer while another thread is calling update() on a
- // second buffer. Since this is the slow path anyway, do the simple thing
- // for now.
- std::io::copy(&mut reader, &mut hasher)?;
+ bg_read(reader, |input| {
+ hasher.update(input);
+ Ok(())
+ })?;
Ok(hasher.finalize_xof())
}
@@ -187,9 +248,7 @@ fn main() -> Result<()> {
}
}
} else {
- let stdin = std::io::stdin();
- let stdin = stdin.lock();
- let output = hash_reader(&base_hasher, stdin)?;
+ let output = hash_reader(&base_hasher, std::io::stdin())?;
write_output(output, len)?;
println!();
}