Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/walrus-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test-utils = [
[dependencies]
anyhow.workspace = true
base64.workspace = true
bcs.workspace = true
bimap.workspace = true
bytes.workspace = true
chrono.workspace = true
Expand Down
171 changes: 170 additions & 1 deletion crates/walrus-sdk/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use bimap::BiMap;
use futures::{
Future,
FutureExt,
future::{Either, select},
future::{Either, join_all, select},
};
use indicatif::MultiProgress;
use rand::{RngCore as _, rngs::ThreadRng};
Expand All @@ -44,6 +44,7 @@ use walrus_core::{
ShardIndex,
Sliver,
SliverIndex,
SliverPairIndex,
SliverType,
bft,
by_axis::ByAxis,
Expand Down Expand Up @@ -203,6 +204,7 @@ struct UploadOptions<'a> {
target_nodes: Option<(Epoch, &'a [NodeIndex])>,
upload_intent: UploadIntent,
initial_completed_weight: Option<&'a HashMap<BlobId, usize>>,
skip_systematic_primary_pairs: usize,
stop_scheduling: Option<CancellationToken>,
cancellation: Option<CancellationToken>,
metrics: Option<Arc<metrics::ClientMetrics>>,
Expand All @@ -216,6 +218,7 @@ struct SendBlobOptions<'a> {
tail_handle_collector: Option<Arc<tokio::sync::Mutex<Vec<JoinHandle<()>>>>>,
target_nodes: Option<(Epoch, &'a [NodeIndex])>,
initial_completed_weight: Option<&'a HashMap<BlobId, usize>>,
skip_systematic_primary_pairs: usize,
metrics: Option<Arc<metrics::ClientMetrics>>,
}

Expand Down Expand Up @@ -1265,6 +1268,7 @@ impl<T: ReadClient> WalrusNodeClient<T> {
.expect("there are shards for each node"),
UploadIntent::Immediate,
None,
0,
)
})
.collect();
Expand Down Expand Up @@ -1489,6 +1493,38 @@ impl<T> WalrusNodeClient<T> {
tail_handle_collector: Option<Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>>,
target_nodes: Option<(Epoch, &[NodeIndex])>,
initial_completed_weight: Option<&HashMap<BlobId, usize>>,
) -> ClientResult<ConfirmationCertificate> {
self.send_blob_data_and_get_certificate_skipping_systematic_primaries(
metadata,
pairs,
0,
blob_persistence_type,
multi_pb,
tail_handling,
quorum_forwarder,
tail_handle_collector,
target_nodes,
initial_completed_weight,
)
.await
}

/// Sends blob data, skipping the final upload of the first `skip_systematic_primary_pairs`
/// primary slivers while still uploading all secondary slivers.
#[tracing::instrument(skip_all)]
#[allow(clippy::too_many_arguments)]
pub async fn send_blob_data_and_get_certificate_skipping_systematic_primaries(
&self,
metadata: &VerifiedBlobMetadataWithId,
pairs: Arc<Vec<SliverPair>>,
skip_systematic_primary_pairs: usize,
blob_persistence_type: &BlobPersistenceType,
multi_pb: Option<&MultiProgress>,
tail_handling: TailHandling,
quorum_forwarder: Option<tokio::sync::mpsc::Sender<UploaderEvent>>,
tail_handle_collector: Option<Arc<tokio::sync::Mutex<Vec<tokio::task::JoinHandle<()>>>>>,
target_nodes: Option<(Epoch, &[NodeIndex])>,
initial_completed_weight: Option<&HashMap<BlobId, usize>>,
) -> ClientResult<ConfirmationCertificate> {
let send_options = SendBlobOptions {
blob_persistence_type,
Expand All @@ -1498,6 +1534,7 @@ impl<T> WalrusNodeClient<T> {
tail_handle_collector,
target_nodes,
initial_completed_weight,
skip_systematic_primary_pairs,
metrics: None,
};

Expand All @@ -1509,6 +1546,132 @@ impl<T> WalrusNodeClient<T> {
certificate
}

/// Best-effort helper to store blob metadata on all write nodes with the provided intent.
///
/// This is used by upload relays that receive slivers progressively. Per-node failures are
/// logged and ignored because the final upload path still performs the normal checked upload.
#[tracing::instrument(
skip_all,
fields(walrus.blob_id = %metadata.blob_id(), walrus.intent = ?intent)
)]
pub async fn store_metadata(
&self,
metadata: &VerifiedBlobMetadataWithId,
intent: UploadIntent,
) -> ClientResult<()> {
let committees = self.get_committees().await?;
let (sliver_write_semaphore, auto_tune_handle) = self.build_sliver_write_throttle(
metadata.metadata().unencoded_length(),
metadata.metadata().encoding_type(),
);
let comms = self.node_write_communications_for_upload(
&committees,
sliver_write_semaphore,
auto_tune_handle,
None,
)?;

for (node, result) in join_all(comms.into_iter().map(|node| async move {
let result = node
.client
.store_metadata_with_intent(metadata, intent)
.await;
(node.node_index, result)
}))
.await
{
if let Err(error) = result {
tracing::debug!(
blob_id = %metadata.blob_id(),
node,
?intent,
%error,
"metadata upload failed",
);
}
}

Ok(())
}

/// Best-effort helper to store a sliver on write nodes responsible for its shard.
///
/// Per-node failures are logged and ignored because the final upload path still performs the
/// normal checked upload before collecting confirmations.
#[tracing::instrument(
skip_all,
fields(
walrus.blob_id = %metadata.blob_id(),
walrus.sliver.pair_index = %pair_index,
walrus.sliver.type_ = %A::NAME,
walrus.intent = ?intent,
)
)]
pub async fn store_sliver<A: EncodingAxis>(
&self,
metadata: &VerifiedBlobMetadataWithId,
pair_index: SliverPairIndex,
sliver: &SliverData<A>,
intent: UploadIntent,
) -> ClientResult<()> {
let committees = self.get_committees().await?;
let shard_index = pair_index.to_shard_index(committees.n_shards(), metadata.blob_id());
let target_nodes: Vec<_> = committees
.write_committee()
.members()
.iter()
.enumerate()
.filter_map(|(node_index, node)| {
node.shard_ids.contains(&shard_index).then_some(node_index)
})
.collect();

if target_nodes.is_empty() {
tracing::debug!(
blob_id = %metadata.blob_id(),
%pair_index,
?shard_index,
"no write nodes own the target shard; skipping sliver upload"
);
return Ok(());
}

let (sliver_write_semaphore, auto_tune_handle) = self.build_sliver_write_throttle(
metadata.metadata().unencoded_length(),
metadata.metadata().encoding_type(),
);
let comms = self.node_write_communications_for_upload(
&committees,
sliver_write_semaphore,
auto_tune_handle,
Some(target_nodes.as_slice()),
)?;

for (node, result) in join_all(comms.into_iter().map(|node| async move {
let result = node
.client
.store_sliver(metadata.blob_id(), pair_index, sliver, intent)
.await;
(node.node_index, result)
}))
.await
{
if let Err(error) = result {
tracing::debug!(
blob_id = %metadata.blob_id(),
node,
?pair_index,
sliver_type = %A::NAME,
?intent,
%error,
"sliver upload failed",
);
}
}

Ok(())
}

/// Uploads slivers (optionally to a target node subset) and then collects confirmations from
/// the full write committee; missing confirmations from non-targeted nodes still count as
/// missing and can trigger retries.
Expand All @@ -1526,6 +1689,7 @@ impl<T> WalrusNodeClient<T> {
tail_handle_collector,
target_nodes,
initial_completed_weight,
skip_systematic_primary_pairs,
metrics,
} = options;
tracing::info!(blob_id = %metadata.blob_id(), "starting to send data to storage nodes");
Expand All @@ -1548,6 +1712,7 @@ impl<T> WalrusNodeClient<T> {
target_nodes,
upload_intent: UploadIntent::Immediate,
initial_completed_weight,
skip_systematic_primary_pairs,
stop_scheduling: None,
cancellation: None,
metrics,
Expand Down Expand Up @@ -1669,6 +1834,7 @@ impl<T> WalrusNodeClient<T> {
target_nodes,
upload_intent,
initial_completed_weight,
skip_systematic_primary_pairs,
stop_scheduling,
cancellation,
metrics,
Expand Down Expand Up @@ -1757,6 +1923,7 @@ impl<T> WalrusNodeClient<T> {
item.pair_indices.iter().map(|&i| &item.pairs[i]),
upload_intent,
metrics.as_deref(),
skip_systematic_primary_pairs,
)
.await;

Expand Down Expand Up @@ -1857,6 +2024,7 @@ impl<T> WalrusNodeClient<T> {
tail_handle_collector: store_args.tail_handle_collector.clone(),
target_nodes,
initial_completed_weight: initial_completed_weight.as_ref(),
skip_systematic_primary_pairs: 0,
metrics: store_args.metrics.clone(),
};

Expand Down Expand Up @@ -1983,6 +2151,7 @@ impl<T> WalrusNodeClient<T> {
target_nodes: Some((certified_epoch, &missing_nodes)),
upload_intent: UploadIntent::Immediate,
initial_completed_weight: initial_weight.as_ref(),
skip_systematic_primary_pairs: 0,
stop_scheduling: None,
cancellation: None,
metrics: store_args.metrics.clone(),
Expand Down
6 changes: 3 additions & 3 deletions crates/walrus-sdk/src/node_client/client_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ impl UnencodedBlob {
/// Encodes the blob using the given encoding config.
///
/// If no upload relay client is provided, the encoded blob contains all sliver pairs.
/// Otherwise, the encoded blob contains the unencoded blob data to be sent to the upload relay.
/// Otherwise, the encoded blob contains the unencoded blob data for the upload relay client.
pub fn encode(
self,
encoding_config: EncodingConfigEnum,
Expand Down Expand Up @@ -625,12 +625,12 @@ impl UnencodedBlob {
/// The data of the blob to be stored.
///
/// This can either be sliver pairs to be sent directly to storage nodes, or the raw blob data to be
/// sent to the upload relay (together with the upload relay client).
/// used by the upload relay client.
#[derive(Clone, PartialEq)]
pub enum BlobData {
/// The encoded sliver pairs generated from the blob to be sent directly to storage nodes.
SliverPairs(Arc<Vec<SliverPair>>),
/// The raw blob data to be sent to the upload relay (together with the upload relay client).
/// The raw blob data and upload relay client used to upload through the relay.
BlobForUploadRelay(Arc<Vec<u8>>, Arc<UploadRelayClient>),
}

Expand Down
Loading
Loading