diff options
| author | Jack O'Connor <[email protected]> | 2020-02-03 11:35:50 -0500 |
|---|---|---|
| committer | Jack O'Connor <[email protected]> | 2020-02-06 15:07:15 -0500 |
| commit | fc219f4f8d92f721d6444bb7420d42d88ee4b43c (patch) | |
| tree | 0601cbc527e4c681e4847f959c099052c0993d9b /src | |
| parent | 24071db3463f29a6ad6173e3aea62b0f1497b5bc (diff) | |
Hasher::update_with_join
This is a new interface that allows the caller to provide a
multi-threading implementation. It's defined in terms of a new `Join`
trait, for which we provide two implementations, `SerialJoin` and
`RayonJoin`. This lets the caller control when multi-threading is used,
rather than the previous all-or-nothing design of the "rayon" feature.
Although existing callers should keep working, this is a compatibility
break, because callers who were relying on automatic multi-threading
before will now be single-threaded. Thus the next release of this crate
will need to be version 0.2.
See https://github.com/BLAKE3-team/BLAKE3/issues/25 and
https://github.com/BLAKE3-team/BLAKE3/issues/54.
Diffstat (limited to 'src')
| -rw-r--r-- | src/join.rs | 114 | ||||
| -rw-r--r-- | src/lib.rs | 112 | ||||
| -rw-r--r-- | src/test.rs | 63 |
3 files changed, 253 insertions, 36 deletions
diff --git a/src/join.rs b/src/join.rs new file mode 100644 index 0000000..8442172 --- /dev/null +++ b/src/join.rs @@ -0,0 +1,114 @@ +//! The multi-threading abstractions used by [`Hasher::update_with_join`]. +//! +//! Different implementations of the `Join` trait determine whether +//! [`Hasher::update_with_join`] performs multi-threading on sufficiently large +//! inputs. The `SerialJoin` implementation is single-threaded, and the +//! `RayonJoin` implementation (gated by the `rayon` feature) is +//! multi-threaded. Interfaces other than [`Hasher::update_with_join`], like +//! [`hash`] and [`Hasher::update`], always use `SerialJoin` internally. +//! +//! The `Join` trait is an almost exact copy of the [`rayon::join`] API, and +//! `RayonJoin` is the only non-trivial implementation provided. The only +//! difference between the function signature in the `Join` trait and the +//! underlying one in Rayon, is that the trait method includes two length +//! parameters. This gives an implementation the option of e.g. setting a +//! subtree size threshold below which it keeps splits on the same thread. +//! However, neither of the two provided implementations currently makes use of +//! those parameters. Note that in Rayon, the very first `join` call is more +//! expensive than subsequent calls, because it moves work from the calling +//! thread into the thread pool. That makes a coarse-grained input length +//! threshold in the caller more effective than a fine-grained subtree size +//! threshold after the implementation has already started recursing. +//! +//! # Example +//! +//! ``` +//! // Hash a large input using multi-threading. Note that multi-threading +//! // comes with some overhead, and it can actually hurt performance for small +//! // inputs. The meaning of "small" varies, however, depending on the +//! // platform and the number of threads. (On x86_64, the cutoff tends to be +//! // around 128 KiB.) You should benchmark your own use case to see whether +//! // multi-threading helps. +//! # #[cfg(feature = "rayon")] +//! # { +//! # fn some_large_input() -> &'static [u8] { b"foo" } +//! let input: &[u8] = some_large_input(); +//! let mut hasher = blake3::Hasher::new(); +//! hasher.update_with_join::<blake3::join::RayonJoin>(input); +//! let hash = hasher.finalize(); +//! # } +//! ``` +//! +//! [`Hasher::update_with_join`]: ../struct.Hasher.html#method.update_with_join +//! [`Hasher::update`]: ../struct.Hasher.html#method.update +//! [`hash`]: ../fn.hash.html +//! [`rayon::join`]: https://docs.rs/rayon/1.3.0/rayon/fn.join.html + +/// The trait that abstracts over single-threaded and multi-threaded recursion. +pub trait Join { + fn join<A, B, RA, RB>(oper_a: A, oper_b: B, len_a: usize, len_b: usize) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send; +} + +/// The trivial, serial implementation of `Join`. The left and right sides are +/// executed one after the other, on the calling thread. The standalone hashing +/// functions and the `Hasher::update` method use this implementation +/// internally. +pub enum SerialJoin {} + +impl Join for SerialJoin { + #[inline] + fn join<A, B, RA, RB>(oper_a: A, oper_b: B, _len_a: usize, _len_b: usize) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, + { + (oper_a(), oper_b()) + } +} + +/// The Rayon-based implementation of `Join`. The left and right sides are +/// executed on the Rayon thread pool, potentially in parallel. This +/// implementation is gated by the `rayon` feature, which is off by default. +#[cfg(feature = "rayon")] +pub enum RayonJoin {} + +#[cfg(feature = "rayon")] +impl Join for RayonJoin { + #[inline] + fn join<A, B, RA, RB>(oper_a: A, oper_b: B, _len_a: usize, _len_b: usize) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, + { + rayon::join(oper_a, oper_b) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_serial_join() { + let oper_a = || 1 + 1; + let oper_b = || 2 + 2; + assert_eq!((2, 4), SerialJoin::join(oper_a, oper_b, 3, 4)); + } + + #[test] + #[cfg(feature = "rayon")] + fn test_rayon_join() { + let oper_a = || 1 + 1; + let oper_b = || 2 + 2; + assert_eq!((2, 4), RayonJoin::join(oper_a, oper_b, 3, 4)); + } +} @@ -60,10 +60,13 @@ pub mod sse41; pub mod traits; +pub mod join; + use arrayref::{array_mut_ref, array_ref}; use arrayvec::{ArrayString, ArrayVec}; use core::cmp; use core::fmt; +use join::{Join, SerialJoin}; use platform::{Platform, MAX_SIMD_DEGREE, MAX_SIMD_DEGREE_OR_2}; /// The number of bytes in a [`Hash`](struct.Hash.html), 32. @@ -414,25 +417,6 @@ fn left_len(content_len: usize) -> usize { largest_power_of_two_leq(full_chunks) * CHUNK_LEN } -// Recurse in parallel with rayon::join() if the "rayon" feature is active. -// Rayon uses a global thread pool and a work-stealing algorithm to hand the -// right side off to another thread, if idle threads are available. If the -// "rayon" feature is disabled, just make ordinary function calls for the left -// and the right. -#[inline] -fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) -where - A: FnOnce() -> RA + Send, - B: FnOnce() -> RB + Send, - RA: Send, - RB: Send, -{ - #[cfg(feature = "rayon")] - return rayon::join(oper_a, oper_b); - #[cfg(not(feature = "rayon"))] - return (oper_a(), oper_b()); -} - // Use SIMD parallelism to hash up to MAX_SIMD_DEGREE chunks at the same time // on a single thread. Write out the chunk chaining values and return the // number of chunks hashed. These chunks are never the root and never empty; @@ -541,7 +525,7 @@ fn compress_parents_parallel( // Why not just have the caller split the input on the first update(), instead // of implementing this special rule? Because we don't want to limit SIMD or // multi-threading parallelism for that update(). -fn compress_subtree_wide( +fn compress_subtree_wide<J: Join>( input: &[u8], key: &CVWords, chunk_counter: u64, @@ -578,9 +562,11 @@ fn compress_subtree_wide( let (left_out, right_out) = cv_array.split_at_mut(degree * OUT_LEN); // Recurse! This uses multiple threads if the "rayon" feature is enabled. - let (left_n, right_n) = join( - || compress_subtree_wide(left, key, chunk_counter, flags, platform, left_out), - || compress_subtree_wide(right, key, right_chunk_counter, flags, platform, right_out), + let (left_n, right_n) = J::join( + || compress_subtree_wide::<J>(left, key, chunk_counter, flags, platform, left_out), + || compress_subtree_wide::<J>(right, key, right_chunk_counter, flags, platform, right_out), + left.len(), + right.len(), ); // The special case again. If simd_degree=1, then we'll have left_n=1 and @@ -614,7 +600,7 @@ fn compress_subtree_wide( // // As with compress_subtree_wide(), this function is not used on inputs of 1 // chunk or less. That's a different codepath. -fn compress_subtree_to_parent_node( +fn compress_subtree_to_parent_node<J: Join>( input: &[u8], key: &CVWords, chunk_counter: u64, @@ -624,7 +610,7 @@ fn compress_subtree_to_parent_node( debug_assert!(input.len() > CHUNK_LEN); let mut cv_array = [0; 2 * MAX_SIMD_DEGREE_OR_2 * OUT_LEN]; let mut num_cvs = - compress_subtree_wide(input, &key, chunk_counter, flags, platform, &mut cv_array); + compress_subtree_wide::<J>(input, &key, chunk_counter, flags, platform, &mut cv_array); debug_assert!(num_cvs >= 2); // If MAX_SIMD_DEGREE is greater than 2 and there's enough input, @@ -641,6 +627,7 @@ fn compress_subtree_to_parent_node( // Hash a complete input all at once. Unlike compress_subtree_wide() and // compress_subtree_to_parent_node(), this function handles the 1 chunk case. +// Note that this we use SerialJoin here, so this is always single-threaded. fn hash_all_at_once(input: &[u8], key: &CVWords, flags: u8) -> Output { let platform = Platform::detect(); @@ -655,7 +642,7 @@ fn hash_all_at_once(input: &[u8], key: &CVWords, flags: u8) -> Output { // compress_subtree_to_parent_node(). Output { input_chaining_value: *key, - block: compress_subtree_to_parent_node(input, key, 0, flags, platform), + block: compress_subtree_to_parent_node::<SerialJoin>(input, key, 0, flags, platform), block_len: BLOCK_LEN as u8, counter: 0, flags: flags | PARENT, @@ -665,9 +652,13 @@ fn hash_all_at_once(input: &[u8], key: &CVWords, flags: u8) -> Output { /// The default hash function. /// -/// For an incremental version that accepts multiple writes, see [`Hasher`]. +/// For an incremental version that accepts multiple writes, see [`Hasher::update`]. /// -/// [`Hasher`]: struct.Hasher.html +/// This function is always single-threaded. For multi-threading support, see +/// [`Hasher::update_with_join`]. +/// +/// [`Hasher::update`]: struct.Hasher.html#method.update +/// [`Hasher::update_with_join`]: struct.Hasher.html#method.update_with_join pub fn hash(input: &[u8]) -> Hash { hash_all_at_once(input, IV, 0).root_hash() } @@ -679,6 +670,11 @@ pub fn hash(input: &[u8]) -> Hash { /// In that use case, the constant-time equality checking provided by /// [`Hash`](struct.Hash.html) is almost always a security requirement, and /// callers need to be careful not to compare MACs as raw bytes. +/// +/// This function is always single-threaded. For multi-threading support, see +/// [`Hasher::update_with_join`]. +/// +/// [`Hasher::update_with_join`]: struct.Hasher.html#method.update_with_join pub fn keyed_hash(key: &[u8; KEY_LEN], input: &[u8]) -> Hash { let key_words = platform::words_from_le_bytes_32(key); hash_all_at_once(input, &key_words, KEYED_HASH).root_hash() @@ -710,9 +706,13 @@ pub fn keyed_hash(key: &[u8; KEY_LEN], input: &[u8]) -> Hash { /// [Argon2]. Password hashes are entirely different from generic hash /// functions, with opposite design requirements. /// +/// This function is always single-threaded. For multi-threading support, see +/// [`Hasher::update_with_join`]. +/// /// [`Hasher::new_derive_key`]: struct.Hasher.html#method.new_derive_key /// [`Hasher::finalize_xof`]: struct.Hasher.html#method.finalize_xof /// [Argon2]: https://en.wikipedia.org/wiki/Argon2 +/// [`Hasher::update_with_join`]: struct.Hasher.html#method.update_with_join pub fn derive_key(context: &str, key_material: &[u8], output: &mut [u8]) { let context_key = hash_all_at_once(context.as_bytes(), IV, DERIVE_KEY_CONTEXT).root_hash(); let context_key_words = platform::words_from_le_bytes_32(context_key.as_bytes()); @@ -877,15 +877,55 @@ impl Hasher { /// Add input bytes to the hash state. You can call this any number of /// times. /// - /// Note that the degree of SIMD and multi-threading parallelism that - /// `Hasher` can use is limited by the size of this input buffer. The 8 KiB - /// buffer currently used by [`std::io::copy`] is enough to leverage AVX2, - /// for example, but not enough to leverage AVX-512. If multi-threading is - /// enabled (the `rayon` feature), the optimal input buffer size will vary - /// considerably across different CPUs, and it may be several mebibytes. + /// This method is always single-threaded. For multi-threading support, see + /// `update_with_join` below. + /// + /// Note that the degree of SIMD parallelism that `update` can use is + /// limited by the size of this input buffer. The 8 KiB buffer currently + /// used by [`std::io::copy`] is enough to leverage AVX2, for example, but + /// not enough to leverage AVX-512. A 16 KiB buffer is large enough to + /// leverage all currently supported SIMD instruction sets. /// /// [`std::io::copy`]: https://doc.rust-lang.org/std/io/fn.copy.html - pub fn update(&mut self, mut input: &[u8]) -> &mut Self { + pub fn update(&mut self, input: &[u8]) -> &mut Self { + self.update_with_join::<SerialJoin>(input) + } + + /// Add input bytes to the hash state, as with `update`, but potentially + /// using multi-threading. See the example below, and the + /// [`join`](join/index.html) module for a more detailed explanation. + /// + /// To get any performance benefit from multi-threading, the input buffer + /// size needs to be very large. As a rule of thumb on x86_64, there is no + /// benefit to multi-threading inputs less than 128 KiB. Other platforms + /// have different thresholds, and in general you need to benchmark your + /// specific use case. Where possible, memory mapping an entire input file + /// is recommended, to take maximum advantage of multi-threading without + /// needing to tune a specific buffer size. Where memory mapping is not + /// possible, good multi-threading performance requires doing IO on a + /// background thread, to avoid sleeping all your worker threads while the + /// input buffer is (serially) refilled. This is quite complicated compared + /// to memory mapping. + /// + /// # Example + /// + /// ``` + /// // Hash a large input using multi-threading. Note that multi-threading + /// // comes with some overhead, and it can actually hurt performance for small + /// // inputs. The meaning of "small" varies, however, depending on the + /// // platform and the number of threads. (On x86_64, the cutoff tends to be + /// // around 128 KiB.) You should benchmark your own use case to see whether + /// // multi-threading helps. + /// # #[cfg(feature = "rayon")] + /// # { + /// # fn some_large_input() -> &'static [u8] { b"foo" } + /// let input: &[u8] = some_large_input(); + /// let mut hasher = blake3::Hasher::new(); + /// hasher.update_with_join::<blake3::join::RayonJoin>(input); + /// let hash = hasher.finalize(); + /// # } + /// ``` + pub fn update_with_join<J: Join>(&mut self, mut input: &[u8]) -> &mut Self { // If we have some partial chunk bytes in the internal chunk_state, we // need to finish that chunk first. if self.chunk_state.len() > 0 { @@ -963,7 +1003,7 @@ impl Hasher { } else { // This is the high-performance happy path, though getting here // depends on the caller giving us a long enough input. - let cv_pair = compress_subtree_to_parent_node( + let cv_pair = compress_subtree_to_parent_node::<J>( &input[..subtree_len], &self.key, self.chunk_state.chunk_counter, diff --git a/src/test.rs b/src/test.rs index 83fd3ae..bc6f136 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,6 +1,7 @@ use crate::{CVBytes, CVWords, IncrementCounter, BLOCK_LEN, CHUNK_LEN, OUT_LEN}; use arrayref::array_ref; use arrayvec::ArrayVec; +use core::sync::atomic::{AtomicUsize, Ordering}; use core::usize; use rand::prelude::*; @@ -469,3 +470,65 @@ fn test_reset() { hasher.update(&[42; CHUNK_LEN + 3]); assert_eq!(hasher.finalize(), crate::hash(&[42; CHUNK_LEN + 3])); } + +#[test] +#[cfg(feature = "rayon")] +fn test_update_with_rayon_join() { + let mut input = [0; TEST_CASES_MAX]; + paint_test_input(&mut input); + let rayon_hash = crate::Hasher::new() + .update_with_join::<crate::join::RayonJoin>(&input) + .finalize(); + assert_eq!(crate::hash(&input), rayon_hash); +} + +// Test that the length values given to Join::join are what they're supposed to +// be. +#[test] +fn test_join_lengths() { + // Use static atomics to let us safely get a couple of values in and out of + // CustomJoin. This avoids depending on std, though it assumes that this + // thread will only run once in the lifetime of the runner process. + static SINGLE_THREAD_LEN: AtomicUsize = AtomicUsize::new(0); + static CUSTOM_JOIN_CALLS: AtomicUsize = AtomicUsize::new(0); + + // Use an input that's exactly (simd_degree * CHUNK_LEN) + 1. That should + // guarantee that compress_subtree_wide does exactly one split, with the + // last byte on the right side. Note that it we used + // Hasher::update_with_join, we would end up buffering that last byte, + // rather than splitting and joining it. + let single_thread_len = crate::platform::Platform::detect().simd_degree() * CHUNK_LEN; + SINGLE_THREAD_LEN.store(single_thread_len, Ordering::SeqCst); + let mut input_buf = [0; 2 * crate::platform::MAX_SIMD_DEGREE * CHUNK_LEN]; + paint_test_input(&mut input_buf); + let input = &input_buf[..single_thread_len + 1]; + + enum CustomJoin {} + + impl crate::join::Join for CustomJoin { + fn join<A, B, RA, RB>(oper_a: A, oper_b: B, len_a: usize, len_b: usize) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, + { + let prev_calls = CUSTOM_JOIN_CALLS.fetch_add(1, Ordering::SeqCst); + assert_eq!(prev_calls, 0); + assert_eq!(len_a, SINGLE_THREAD_LEN.load(Ordering::SeqCst)); + assert_eq!(len_b, 1); + (oper_a(), oper_b()) + } + } + + let mut out_buf = [0; crate::platform::MAX_SIMD_DEGREE_OR_2 * CHUNK_LEN]; + crate::compress_subtree_wide::<CustomJoin>( + input, + crate::IV, + 0, + 0, + crate::platform::Platform::detect(), + &mut out_buf, + ); + assert_eq!(CUSTOM_JOIN_CALLS.load(Ordering::SeqCst), 1); +} |
