diff --git a/Cargo.toml b/Cargo.toml index 984ba19ec7..e977254a2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ default-members = [ "crates/walrus-upload-relay", "crates/walrus-utils", ] +exclude = ["crates/rocksdb"] members = ["crates/*"] resolver = "2" diff --git a/crates/walrus-service/src/node/dbtool.rs b/crates/walrus-service/src/node/dbtool.rs index ea6475fc91..cf5299f99e 100644 --- a/crates/walrus-service/src/node/dbtool.rs +++ b/crates/walrus-service/src/node/dbtool.rs @@ -3,7 +3,12 @@ //! Tools for inspecting and maintaining the RocksDB database. -use std::{collections::BTreeMap, path::PathBuf, thread::sleep, time::Duration}; +use std::{ + collections::{BTreeMap, BTreeSet}, + path::{Path, PathBuf}, + thread::sleep, + time::Duration, +}; use anyhow::{Context, Result, bail}; use bincode::Options; @@ -14,9 +19,11 @@ use rocksdb::{ CompactOptions, DB, DBRecoveryMode, + FlushOptions, Options as RocksdbOptions, ReadOptions, WaitForCompactOptions, + WriteBatch, properties, }; use serde::{Deserialize, Serialize}; @@ -115,6 +122,29 @@ impl std::fmt::Display for ProbeWalRecoveryMode { } } +/// A mapping from an input RocksDB column family to an output Walrus DB column family. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ColumnFamilyMapping { + input: String, + output: String, +} + +fn parse_column_family_mapping(value: &str) -> std::result::Result { + let (input, output) = value + .split_once('=') + .ok_or_else(|| "expected mapping in the form input_cf=output_cf".to_string())?; + let input = input.trim(); + let output = output.trim(); + if input.is_empty() || output.is_empty() { + return Err("column family mapping cannot contain an empty side".to_string()); + } + + Ok(ColumnFamilyMapping { + input: input.to_owned(), + output: output.to_owned(), + }) +} + /// Database inspection and maintenance tools. #[derive(Subcommand, Debug, Clone, Serialize, Deserialize)] #[serde_as] @@ -199,6 +229,21 @@ pub enum DbToolCommands { column_family_names: Vec, }, + /// Replace column families in a Walrus RocksDB database by copying them from another RocksDB + /// database. This can only be called when the storage node is stopped. + RestoreColumnFamilies { + /// Path to the Walrus RocksDB database directory to mutate. + #[arg(long)] + db_path: PathBuf, + /// Path to the input RocksDB database directory to copy from. + #[arg(long)] + input_db_path: PathBuf, + /// Mapping from input column family to output column family, in the form + /// `input_cf=output_cf`. Can be passed multiple times. + #[arg(long = "cf-mapping", value_parser = parse_column_family_mapping, num_args = 1..)] + cf_mappings: Vec, + }, + /// List all column families in the RocksDB database. ListColumnFamilies { /// Path to the RocksDB database directory. @@ -380,6 +425,11 @@ impl DbToolCommands { db_path, column_family_names, } => drop_column_families(db_path, column_family_names), + Self::RestoreColumnFamilies { + db_path, + input_db_path, + cf_mappings, + } => restore_column_families(db_path, input_db_path, cf_mappings), Self::ListColumnFamilies { db_path } => list_column_families(db_path), Self::ProbeRecovery { db_path, @@ -815,6 +865,239 @@ fn drop_column_families(db_path: PathBuf, column_family_names: Vec) -> R Ok(()) } +const RESTORE_CF_WRITE_BATCH_SIZE: usize = 10_000; + +fn restore_column_families( + db_path: PathBuf, + input_db_path: PathBuf, + cf_mappings: Vec, +) -> Result<()> { + let db_path = canonicalize_db_path(&db_path, "output Walrus DB")?; + let input_db_path = canonicalize_db_path(&input_db_path, "input DB")?; + if db_path == input_db_path { + bail!("input DB path and output Walrus DB path must be different"); + } + + validate_column_family_mappings(&cf_mappings)?; + + let input_column_families = DB::list_cf(&RocksdbOptions::default(), &input_db_path) + .with_context(|| { + format!( + "failed to list column families in input DB `{}`", + input_db_path.display() + ) + })?; + validate_input_column_families_exist(&cf_mappings, &input_column_families)?; + + let output_column_families = + DB::list_cf(&RocksdbOptions::default(), &db_path).with_context(|| { + format!( + "failed to list column families in output Walrus DB `{}`", + db_path.display() + ) + })?; + let output_db_kind = detect_probe_db_kind(&output_column_families); + let output_db_table_opts_factory = db_table_options_factory_for_kind(output_db_kind); + + let input_db_kind = detect_probe_db_kind(&input_column_families); + let input_db_table_opts_factory = db_table_options_factory_for_kind(input_db_kind); + let mut input_db_opts = RocksdbOptions::from(&input_db_table_opts_factory.global()); + input_db_opts.create_if_missing(false); + input_db_opts.create_missing_column_families(false); + + let input_db = DB::open_cf_with_opts_for_read_only( + &input_db_opts, + &input_db_path, + input_cf_options( + &input_column_families, + input_db_kind, + &input_db_table_opts_factory, + output_db_kind, + &output_db_table_opts_factory, + &cf_mappings, + ), + false, + ) + .with_context(|| format!("failed to open input DB `{}`", input_db_path.display()))?; + + let mut output_db_opts = RocksdbOptions::from(&output_db_table_opts_factory.global()); + output_db_opts.create_if_missing(false); + output_db_opts.create_missing_column_families(false); + + let output_db = DB::open_cf_with_opts( + &output_db_opts, + &db_path, + cf_options( + &output_column_families, + output_db_kind, + &output_db_table_opts_factory, + ), + ) + .inspect_err(|_| { + println!( + "failed to open output Walrus DB; \ + make sure to stop the storage node before attempting to restore column families" + ) + }) + .with_context(|| format!("failed to open output Walrus DB `{}`", db_path.display()))?; + + println!( + "Restoring {} column family mapping(s) from {} into {}", + cf_mappings.len(), + input_db_path.display(), + db_path.display() + ); + + let mut total_entries = 0_u64; + for mapping in cf_mappings { + let copied = restore_column_family( + &input_db, + &output_db, + &mapping, + output_db_kind, + &output_db_table_opts_factory, + )?; + total_entries += copied; + } + + println!("Finished restoring column families; copied {total_entries} entries total"); + Ok(()) +} + +fn canonicalize_db_path(db_path: &Path, label: &str) -> Result { + std::fs::canonicalize(db_path).with_context(|| { + format!( + "failed to canonicalize {label} path `{}`", + db_path.display() + ) + }) +} + +fn validate_column_family_mappings(cf_mappings: &[ColumnFamilyMapping]) -> Result<()> { + if cf_mappings.is_empty() { + bail!("at least one column family mapping is required"); + } + + let mut input_column_families = BTreeSet::new(); + let mut output_column_families = BTreeSet::new(); + for mapping in cf_mappings { + if !input_column_families.insert(mapping.input.as_str()) { + bail!( + "input column family `{}` is listed more than once", + mapping.input + ); + } + if !output_column_families.insert(mapping.output.as_str()) { + bail!( + "output column family `{}` is listed more than once", + mapping.output + ); + } + } + + Ok(()) +} + +fn validate_input_column_families_exist( + cf_mappings: &[ColumnFamilyMapping], + input_column_families: &[String], +) -> Result<()> { + let input_column_family_set = input_column_families + .iter() + .map(String::as_str) + .collect::>(); + + for mapping in cf_mappings { + if !input_column_family_set.contains(mapping.input.as_str()) { + bail!( + "input column family `{}` does not exist in input DB", + mapping.input + ); + } + } + + Ok(()) +} + +fn restore_column_family( + input_db: &DB, + output_db: &DB, + mapping: &ColumnFamilyMapping, + output_db_kind: ProbeDbKind, + output_db_table_opts_factory: &DatabaseTableOptionsFactory, +) -> Result { + let input_cf = input_db + .cf_handle(&mapping.input) + .with_context(|| format!("input column family `{}` should exist", mapping.input))?; + + if output_db.cf_handle(&mapping.output).is_some() { + println!("Dropping output column family: {}", mapping.output); + output_db + .drop_cf(&mapping.output) + .with_context(|| format!("failed to drop output column family `{}`", mapping.output))?; + } else { + println!( + "Output column family {} does not exist; creating it", + mapping.output + ); + } + + let output_cf_options = cf_options_for_name( + &mapping.output, + output_db_kind, + output_db_table_opts_factory, + ); + output_db + .create_cf(&mapping.output, &output_cf_options) + .with_context(|| format!("failed to create output column family `{}`", mapping.output))?; + let output_cf = output_db + .cf_handle(&mapping.output) + .with_context(|| format!("output column family `{}` should exist", mapping.output))?; + + println!( + "Copying input column family {} to output column family {}", + mapping.input, mapping.output + ); + + let mut copied = 0_u64; + let mut batch = WriteBatch::default(); + for entry in input_db.iterator_cf(&input_cf, rocksdb::IteratorMode::Start) { + let (key, value) = entry.with_context(|| { + format!( + "failed while reading input column family `{}`", + mapping.input + ) + })?; + batch.put_cf(&output_cf, key, value); + copied += 1; + + if batch.len() >= RESTORE_CF_WRITE_BATCH_SIZE { + output_db.write(batch).with_context(|| { + format!("failed to write output column family `{}`", mapping.output) + })?; + batch = WriteBatch::default(); + println!("Copied {copied} entries into {}", mapping.output); + } + } + + if !batch.is_empty() { + output_db.write(batch).with_context(|| { + format!("failed to write output column family `{}`", mapping.output) + })?; + } + + output_db + .flush_cf_opt(&output_cf, &FlushOptions::default()) + .with_context(|| format!("failed to flush output column family `{}`", mapping.output))?; + + println!( + "Restored output column family {} from input column family {}; copied {} entries", + mapping.output, mapping.input, copied + ); + + Ok(copied) +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum ProbeDbKind { Storage, @@ -1171,6 +1454,33 @@ fn cf_options<'a>( .collect() } +fn input_cf_options<'a>( + input_column_families: &'a [String], + input_db_kind: ProbeDbKind, + input_factory: &DatabaseTableOptionsFactory, + output_db_kind: ProbeDbKind, + output_factory: &DatabaseTableOptionsFactory, + cf_mappings: &[ColumnFamilyMapping], +) -> Vec<(&'a str, RocksdbOptions)> { + let output_by_input = cf_mappings + .iter() + .map(|mapping| (mapping.input.as_str(), mapping.output.as_str())) + .collect::>(); + + input_column_families + .iter() + .map(|cf_name| { + let options = output_by_input + .get(cf_name.as_str()) + .map(|output_cf_name| { + cf_options_for_name(output_cf_name, output_db_kind, output_factory) + }) + .unwrap_or_else(|| cf_options_for_name(cf_name, input_db_kind, input_factory)); + (cf_name.as_str(), options) + }) + .collect() +} + fn cf_options_for_name( cf_name: &str, db_kind: ProbeDbKind, @@ -1655,6 +1965,119 @@ mod tests { compact_db(db_dir.path().to_path_buf(), CompactDbMode::Drain) } + #[test] + fn restore_column_families_replaces_output_column_family() -> Result<()> { + let input_db_dir = tempdir()?; + let output_db_dir = tempdir()?; + let db_table_opts_factory = + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), true); + let mut db_opts = RocksdbOptions::from(&db_table_opts_factory.global()); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + + let input_cf_name = "repaired-primary-slivers"; + let output_cf_name = primary_slivers_column_family_name(ShardIndex(7)); + + let input_db = DB::open_cf_descriptors( + &db_opts, + input_db_dir.path(), + vec![ColumnFamilyDescriptor::new( + input_cf_name, + db_table_opts_factory.shard(), + )], + )?; + let input_cf = input_db + .cf_handle(input_cf_name) + .expect("input column family should exist"); + let recovered_large_value = vec![42; (1 << 20) + 1]; + input_db.put_cf(&input_cf, b"blob-1", b"recovered-1")?; + input_db.put_cf(&input_cf, b"blob-2", &recovered_large_value)?; + input_db.flush_cf(&input_cf)?; + drop(input_cf); + drop(input_db); + + let output_db = DB::open_cf_descriptors( + &db_opts, + output_db_dir.path(), + vec![ColumnFamilyDescriptor::new( + output_cf_name.clone(), + db_table_opts_factory.shard(), + )], + )?; + let output_cf = output_db + .cf_handle(&output_cf_name) + .expect("output column family should exist"); + output_db.put_cf(&output_cf, b"blob-1", b"corrupted")?; + output_db.put_cf(&output_cf, b"stale", b"stale")?; + drop(output_cf); + drop(output_db); + + restore_column_families( + output_db_dir.path().to_path_buf(), + input_db_dir.path().to_path_buf(), + vec![ColumnFamilyMapping { + input: input_cf_name.to_owned(), + output: output_cf_name.clone(), + }], + )?; + + let output_db = DB::open_cf_with_opts_for_read_only( + &RocksdbOptions::default(), + output_db_dir.path(), + [(output_cf_name.as_str(), db_table_opts_factory.shard())], + false, + )?; + let output_cf = output_db + .cf_handle(&output_cf_name) + .expect("output column family should exist after restore"); + + assert_eq!( + output_db.get_cf(&output_cf, b"blob-1")?, + Some(b"recovered-1".to_vec()) + ); + assert_eq!( + output_db.get_cf(&output_cf, b"blob-2")?, + Some(recovered_large_value) + ); + assert_eq!(output_db.get_cf(&output_cf, b"stale")?, None); + + Ok(()) + } + + #[test] + fn restore_column_families_command_parses_mapping() { + let args = DbToolArgs::try_parse_from([ + "db-tool", + "restore-column-families", + "--db-path", + "/tmp/walrus-db", + "--input-db-path", + "/tmp/repaired-db", + "--cf-mapping", + "source_cf=shard-7/primary-slivers", + ]) + .expect("restore-column-families command should parse"); + + let DbToolCommands::RestoreColumnFamilies { + db_path, + input_db_path, + cf_mappings, + } = args.command + else { + panic!("expected restore-column-families command"); + }; + + assert_eq!(db_path, PathBuf::from("/tmp/walrus-db")); + assert_eq!(input_db_path, PathBuf::from("/tmp/repaired-db")); + assert_eq!( + cf_mappings, + vec![ColumnFamilyMapping { + input: "source_cf".to_owned(), + output: "shard-7/primary-slivers".to_owned(), + }] + ); + } + #[test] fn detect_storage_probe_db() { let column_families = vec![