From 7ff0b5c24663e153476e28150866d0115e16d8f8 Mon Sep 17 00:00:00 2001 From: "Elizabeth B." Date: Sat, 2 May 2026 19:28:24 -0500 Subject: [PATCH 1/2] perf: parallelize blob fetch in dump command Extending #12. PR #106 added get_blob_cached to amortize repeat fetches, but commands::dump::dump still fetched each blob serially. On high-latency object stores the throughput was capped by per-blob round-trip latency. This routes the inner loop through pariter::parallel_map_scoped_custom (the same primitive the archiver and packer already use) so fetches overlap while the writer stays single-threaded and ordered. Output is byte-identical to the sequential loop. A new DumpOptions { num_threads: u32 } and Repository::dump_with_opts are added. Repository::dump(node, w) keeps its existing signature and now defaults to available_parallelism. Single-blob files short-circuit to the sequential path so very small files don't pay the worker setup cost. The rustic CLI's dump --archive tar / targz, mount, and webdav paths go through vfs::OpenFile::read_at rather than commands::dump::dump, so they aren't affected here. A follow-up can apply the same idea to the VFS reader. --- crates/core/src/commands/dump.rs | 109 +++++++++++++++++++++++--- crates/core/src/lib.rs | 1 + crates/core/src/repository.rs | 40 +++++++++- crates/core/tests/integration.rs | 1 + crates/core/tests/integration/dump.rs | 87 ++++++++++++++++++++ 5 files changed, 223 insertions(+), 15 deletions(-) create mode 100644 crates/core/tests/integration/dump.rs diff --git a/crates/core/src/commands/dump.rs b/crates/core/src/commands/dump.rs index c4c7b218..4e5ec165 100644 --- a/crates/core/src/commands/dump.rs +++ b/crates/core/src/commands/dump.rs @@ -1,17 +1,58 @@ -use std::io::Write; +use std::{io::Write, num::NonZeroUsize, thread::scope}; + +use derive_setters::Setters; +use pariter::IteratorExt; use crate::{ backend::node::{Node, NodeType}, - blob::{BlobId, BlobType}, + blob::{BlobId, BlobType, DataId}, error::{ErrorKind, RusticError, RusticResult}, repository::{IndexedFull, Repository}, }; +pub(crate) mod constants { + /// Minimum blob count required to enable parallel fetching. + /// + /// For files that decompose into a single blob there is nothing to overlap, + /// so we stay on the sequential path to avoid the worker-thread setup cost. + pub(crate) const PARALLEL_DUMP_MIN_BLOBS: usize = 2; +} + +/// Options for the `dump` command. +#[cfg_attr(feature = "clap", derive(clap::Parser))] +#[derive(Debug, Copy, Clone, Default, Setters)] +#[setters(into)] +#[non_exhaustive] +pub struct DumpOptions { + /// Number of reader threads used to fetch blobs in parallel. + /// + /// `0` selects the available parallelism reported by the runtime. + /// `1` forces the sequential implementation. + #[cfg_attr(feature = "clap", clap(long, default_value = "0"))] + pub num_threads: u32, +} + +impl DumpOptions { + /// Resolve the configured thread count to a concrete value. + /// + /// Returns `None` for the sequential path and `Some(n)` for `n` worker + /// threads. + fn resolved_threads(self, blob_count: usize) -> Option { + if blob_count < constants::PARALLEL_DUMP_MIN_BLOBS { + return None; + } + let threads = match self.num_threads { + 0 => std::thread::available_parallelism().map_or(1, NonZeroUsize::get), + n => n as usize, + }; + NonZeroUsize::new(threads).filter(|n| n.get() > 1) + } +} + /// Dumps the contents of a file. /// /// # Type Parameters /// -/// * `P` - The progress bar type. /// * `S` - The type of the indexed tree. /// /// # Arguments @@ -19,14 +60,18 @@ use crate::{ /// * `repo` - The repository to read from. /// * `node` - The node to dump. /// * `w` - The writer to write to. +/// * `opts` - The dump options to use. /// /// # Errors /// /// * If the node is not a file. -pub(crate) fn dump( +/// * If a blob cannot be fetched from the backend. +/// * If writing to `w` fails. +pub(crate) fn dump( repo: &Repository, node: &Node, w: &mut impl Write, + opts: DumpOptions, ) -> RusticResult<()> { if node.node_type != NodeType::File { return Err(RusticError::new( @@ -36,15 +81,55 @@ pub(crate) fn dump( .attach_context("node_type", node.node_type.to_string())); } - for id in node.content.as_ref().unwrap() { + let Some(content) = node.content.as_ref() else { + return Ok(()); + }; + + match opts.resolved_threads(content.len()) { + None => dump_sequential(repo, content, w), + Some(threads) => dump_parallel(repo, content, w, threads), + } +} + +fn dump_sequential( + repo: &Repository, + content: &[DataId], + w: &mut impl Write, +) -> RusticResult<()> { + for id in content { let data = repo.get_blob_cached(&BlobId::from(**id), BlobType::Data)?; - w.write_all(&data).map_err(|err| { - RusticError::with_source( - ErrorKind::InputOutput, - "Failed to write data to writer.", - err, - ) - })?; + write_blob(w, &data)?; } Ok(()) } + +fn dump_parallel( + repo: &Repository, + content: &[DataId], + w: &mut impl Write, + threads: NonZeroUsize, +) -> RusticResult<()> { + let threads = threads.get(); + + scope(|s| -> RusticResult<()> { + content + .iter() + .map(|id| BlobId::from(**id)) + .parallel_map_scoped_custom( + s, + |b| b.threads(threads).buffer_size(threads * 2), + |id| repo.get_blob_cached(&id, BlobType::Data), + ) + .try_for_each(|res| write_blob(w, &res?)) + }) +} + +fn write_blob(w: &mut impl Write, data: &[u8]) -> RusticResult<()> { + w.write_all(data).map_err(|err| { + RusticError::with_source( + ErrorKind::InputOutput, + "Failed to write data to writer.", + err, + ) + }) +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index d267e7ed..67535011 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -151,6 +151,7 @@ pub use crate::{ check::{CheckOptions, CheckResults, ReadSubsetOption}, config::ConfigOptions, copy::CopySnapshot, + dump::DumpOptions, forget::{ForgetGroup, ForgetGroups, ForgetSnapshot, KeepOptions}, key::KeyOptions, prune::{LimitOption, PruneOptions, PrunePlan, PruneStats}, diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index 260ba697..d4ef7c74 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -42,6 +42,7 @@ use crate::{ check::{CheckOptions, CheckResults, check_repository}, config::{ConfigOptions, save_config_hot}, copy::CopySnapshot, + dump::DumpOptions, key::{KeyOptions, add_current_key_to_repo}, prune::{PruneOptions, PrunePlan, prune_repository}, repair::{ @@ -1744,6 +1745,9 @@ impl Repository { /// Dump a [`Node`] using the given writer. /// + /// Equivalent to [`Self::dump_with_opts`] with [`DumpOptions::default`], + /// which fetches blobs in parallel using the available parallelism. + /// /// # Arguments /// /// * `node` - The node to dump @@ -1752,12 +1756,42 @@ impl Repository { /// # Errors /// /// * If the node is not a file. - /// + /// + /// # Note + /// + /// Currently, only regular file nodes are supported. + pub fn dump(&self, node: &Node, w: &mut impl Write) -> RusticResult<()> + where + S: Sync, + { + commands::dump::dump(self, node, w, DumpOptions::default()) + } + + /// Dump a [`Node`] using the given writer and dump options. + /// + /// # Arguments + /// + /// * `node` - The node to dump + /// * `w` - The writer to use + /// * `opts` - The dump options to use + /// + /// # Errors + /// + /// * If the node is not a file. + /// /// # Note /// /// Currently, only regular file nodes are supported. - pub fn dump(&self, node: &Node, w: &mut impl Write) -> RusticResult<()> { - commands::dump::dump(self, node, w) + pub fn dump_with_opts( + &self, + node: &Node, + w: &mut impl Write, + opts: &DumpOptions, + ) -> RusticResult<()> + where + S: Sync, + { + commands::dump::dump(self, node, w, *opts) } /// Prepare the restore. diff --git a/crates/core/tests/integration.rs b/crates/core/tests/integration.rs index 076b3ed8..43840ec7 100644 --- a/crates/core/tests/integration.rs +++ b/crates/core/tests/integration.rs @@ -28,6 +28,7 @@ mod integration { mod check; mod chunker; mod copy; + mod dump; mod find; mod hotcold; mod key; diff --git a/crates/core/tests/integration/dump.rs b/crates/core/tests/integration/dump.rs new file mode 100644 index 00000000..84627684 --- /dev/null +++ b/crates/core/tests/integration/dump.rs @@ -0,0 +1,87 @@ +use std::{fs, io::Write, path::PathBuf, str::FromStr}; + +use anyhow::Result; +use bytesize::ByteSize; +use pretty_assertions::assert_eq; +use rstest::rstest; +use tempfile::tempdir; + +use rustic_core::{ + BackupOptions, ConfigOptions, DumpOptions, IndexedFullStatus, PathList, Repository, + repofile::{Chunker, SnapshotFile}, +}; + +use super::{RepoOpen, set_up_repo}; + +/// Build a deterministic byte payload of the requested length. +fn payload(len: usize) -> Vec { + (0..len) + .map(|i| u8::try_from(i % 251).expect("251 always fits in u8")) + .collect() +} + +/// Backup a single file with the given content into `repo`, configuring the +/// fixed-size chunker so the file reliably splits into multiple blobs. +/// +/// Returns the repository in the [`IndexedFullStatus`] state along with the +/// snapshot path that points at the backed-up file. +fn backup_single_file( + repo: RepoOpen, + name: &str, + data: &[u8], +) -> Result<(Repository, String)> { + let dir = tempdir()?; + let file_path = dir.path().join(name); + fs::File::create(&file_path)?.write_all(data)?; + + let mut repo = repo.to_indexed_ids()?; + let config = ConfigOptions::default() + .set_chunker(Chunker::FixedSize) + .set_chunk_size(ByteSize(4096)); + assert!(repo.apply_config(&config)?); + + let paths = PathList::from_iter([file_path]); + let opts = BackupOptions::default().as_path(PathBuf::from_str(name)?); + let _snapshot = repo.backup(&opts, &paths, SnapshotFile::default())?; + + Ok((repo.to_indexed()?, format!("latest:{name}"))) +} + +#[rstest] +fn test_dump_parallel_matches_sequential(set_up_repo: Result) -> Result<()> { + let (repo, snapshot_path) = backup_single_file(set_up_repo?, "file.bin", &payload(64 * 1024))?; + let node = repo.node_from_snapshot_path(&snapshot_path, |_| true)?; + + // Sanity: the configured chunker must have produced more than one blob, + // otherwise the parallel path is never taken. + let blob_count = node.content.as_ref().map_or(0, Vec::len); + assert!( + blob_count > 1, + "expected the test file to span multiple blobs, got {blob_count}", + ); + + let dump = |opts: DumpOptions| -> Result> { + let mut out = Vec::new(); + repo.dump_with_opts(&node, &mut out, &opts)?; + Ok(out) + }; + + let sequential = dump(DumpOptions::default().num_threads(1u32))?; + let parallel = dump(DumpOptions::default().num_threads(8u32))?; + + assert_eq!(sequential, payload(64 * 1024)); + assert_eq!(parallel, sequential); + Ok(()) +} + +#[rstest] +fn test_dump_default_options_match_source(set_up_repo: Result) -> Result<()> { + let data = payload(32 * 1024); + let (repo, snapshot_path) = backup_single_file(set_up_repo?, "file.bin", &data)?; + let node = repo.node_from_snapshot_path(&snapshot_path, |_| true)?; + + let mut out = Vec::new(); + repo.dump(&node, &mut out)?; + assert_eq!(out, data); + Ok(()) +} From 4409311da206bb63a7b3237f4a7da5c712307ecf Mon Sep 17 00:00:00 2001 From: "Elizabeth B." Date: Wed, 6 May 2026 16:35:16 -0500 Subject: [PATCH 2/2] perf: hard-code parallelism for dump blob fetch --- crates/core/src/commands/dump.rs | 79 ++++----------------------- crates/core/src/lib.rs | 1 - crates/core/src/repository.rs | 33 +---------- crates/core/tests/integration/dump.rs | 21 +++---- 4 files changed, 20 insertions(+), 114 deletions(-) diff --git a/crates/core/src/commands/dump.rs b/crates/core/src/commands/dump.rs index 4e5ec165..9d52744d 100644 --- a/crates/core/src/commands/dump.rs +++ b/crates/core/src/commands/dump.rs @@ -1,6 +1,5 @@ -use std::{io::Write, num::NonZeroUsize, thread::scope}; +use std::{io::Write, thread::scope}; -use derive_setters::Setters; use pariter::IteratorExt; use crate::{ @@ -10,45 +9,6 @@ use crate::{ repository::{IndexedFull, Repository}, }; -pub(crate) mod constants { - /// Minimum blob count required to enable parallel fetching. - /// - /// For files that decompose into a single blob there is nothing to overlap, - /// so we stay on the sequential path to avoid the worker-thread setup cost. - pub(crate) const PARALLEL_DUMP_MIN_BLOBS: usize = 2; -} - -/// Options for the `dump` command. -#[cfg_attr(feature = "clap", derive(clap::Parser))] -#[derive(Debug, Copy, Clone, Default, Setters)] -#[setters(into)] -#[non_exhaustive] -pub struct DumpOptions { - /// Number of reader threads used to fetch blobs in parallel. - /// - /// `0` selects the available parallelism reported by the runtime. - /// `1` forces the sequential implementation. - #[cfg_attr(feature = "clap", clap(long, default_value = "0"))] - pub num_threads: u32, -} - -impl DumpOptions { - /// Resolve the configured thread count to a concrete value. - /// - /// Returns `None` for the sequential path and `Some(n)` for `n` worker - /// threads. - fn resolved_threads(self, blob_count: usize) -> Option { - if blob_count < constants::PARALLEL_DUMP_MIN_BLOBS { - return None; - } - let threads = match self.num_threads { - 0 => std::thread::available_parallelism().map_or(1, NonZeroUsize::get), - n => n as usize, - }; - NonZeroUsize::new(threads).filter(|n| n.get() > 1) - } -} - /// Dumps the contents of a file. /// /// # Type Parameters @@ -60,7 +20,6 @@ impl DumpOptions { /// * `repo` - The repository to read from. /// * `node` - The node to dump. /// * `w` - The writer to write to. -/// * `opts` - The dump options to use. /// /// # Errors /// @@ -71,7 +30,6 @@ pub(crate) fn dump( repo: &Repository, node: &Node, w: &mut impl Write, - opts: DumpOptions, ) -> RusticResult<()> { if node.node_type != NodeType::File { return Err(RusticError::new( @@ -85,10 +43,18 @@ pub(crate) fn dump( return Ok(()); }; - match opts.resolved_threads(content.len()) { - None => dump_sequential(repo, content, w), - Some(threads) => dump_parallel(repo, content, w, threads), + // Single-blob files have nothing to overlap, so skip the worker setup. + if content.len() < 2 { + return dump_sequential(repo, content, w); } + + scope(|s| -> RusticResult<()> { + content + .iter() + .map(|id| BlobId::from(**id)) + .parallel_map_scoped(s, |id| repo.get_blob_cached(&id, BlobType::Data)) + .try_for_each(|res| write_blob(w, &res?)) + }) } fn dump_sequential( @@ -103,27 +69,6 @@ fn dump_sequential( Ok(()) } -fn dump_parallel( - repo: &Repository, - content: &[DataId], - w: &mut impl Write, - threads: NonZeroUsize, -) -> RusticResult<()> { - let threads = threads.get(); - - scope(|s| -> RusticResult<()> { - content - .iter() - .map(|id| BlobId::from(**id)) - .parallel_map_scoped_custom( - s, - |b| b.threads(threads).buffer_size(threads * 2), - |id| repo.get_blob_cached(&id, BlobType::Data), - ) - .try_for_each(|res| write_blob(w, &res?)) - }) -} - fn write_blob(w: &mut impl Write, data: &[u8]) -> RusticResult<()> { w.write_all(data).map_err(|err| { RusticError::with_source( diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 67535011..d267e7ed 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -151,7 +151,6 @@ pub use crate::{ check::{CheckOptions, CheckResults, ReadSubsetOption}, config::ConfigOptions, copy::CopySnapshot, - dump::DumpOptions, forget::{ForgetGroup, ForgetGroups, ForgetSnapshot, KeepOptions}, key::KeyOptions, prune::{LimitOption, PruneOptions, PrunePlan, PruneStats}, diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index d4ef7c74..fc09dd54 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -42,7 +42,6 @@ use crate::{ check::{CheckOptions, CheckResults, check_repository}, config::{ConfigOptions, save_config_hot}, copy::CopySnapshot, - dump::DumpOptions, key::{KeyOptions, add_current_key_to_repo}, prune::{PruneOptions, PrunePlan, prune_repository}, repair::{ @@ -1745,9 +1744,6 @@ impl Repository { /// Dump a [`Node`] using the given writer. /// - /// Equivalent to [`Self::dump_with_opts`] with [`DumpOptions::default`], - /// which fetches blobs in parallel using the available parallelism. - /// /// # Arguments /// /// * `node` - The node to dump @@ -1764,34 +1760,7 @@ impl Repository { where S: Sync, { - commands::dump::dump(self, node, w, DumpOptions::default()) - } - - /// Dump a [`Node`] using the given writer and dump options. - /// - /// # Arguments - /// - /// * `node` - The node to dump - /// * `w` - The writer to use - /// * `opts` - The dump options to use - /// - /// # Errors - /// - /// * If the node is not a file. - /// - /// # Note - /// - /// Currently, only regular file nodes are supported. - pub fn dump_with_opts( - &self, - node: &Node, - w: &mut impl Write, - opts: &DumpOptions, - ) -> RusticResult<()> - where - S: Sync, - { - commands::dump::dump(self, node, w, *opts) + commands::dump::dump(self, node, w) } /// Prepare the restore. diff --git a/crates/core/tests/integration/dump.rs b/crates/core/tests/integration/dump.rs index 84627684..42a9a4e5 100644 --- a/crates/core/tests/integration/dump.rs +++ b/crates/core/tests/integration/dump.rs @@ -7,7 +7,7 @@ use rstest::rstest; use tempfile::tempdir; use rustic_core::{ - BackupOptions, ConfigOptions, DumpOptions, IndexedFullStatus, PathList, Repository, + BackupOptions, ConfigOptions, IndexedFullStatus, PathList, Repository, repofile::{Chunker, SnapshotFile}, }; @@ -48,8 +48,9 @@ fn backup_single_file( } #[rstest] -fn test_dump_parallel_matches_sequential(set_up_repo: Result) -> Result<()> { - let (repo, snapshot_path) = backup_single_file(set_up_repo?, "file.bin", &payload(64 * 1024))?; +fn test_dump_multi_blob_matches_source(set_up_repo: Result) -> Result<()> { + let data = payload(64 * 1024); + let (repo, snapshot_path) = backup_single_file(set_up_repo?, "file.bin", &data)?; let node = repo.node_from_snapshot_path(&snapshot_path, |_| true)?; // Sanity: the configured chunker must have produced more than one blob, @@ -60,17 +61,9 @@ fn test_dump_parallel_matches_sequential(set_up_repo: Result) -> Resul "expected the test file to span multiple blobs, got {blob_count}", ); - let dump = |opts: DumpOptions| -> Result> { - let mut out = Vec::new(); - repo.dump_with_opts(&node, &mut out, &opts)?; - Ok(out) - }; - - let sequential = dump(DumpOptions::default().num_threads(1u32))?; - let parallel = dump(DumpOptions::default().num_threads(8u32))?; - - assert_eq!(sequential, payload(64 * 1024)); - assert_eq!(parallel, sequential); + let mut out = Vec::new(); + repo.dump(&node, &mut out)?; + assert_eq!(out, data); Ok(()) }