Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,733 changes: 940 additions & 793 deletions pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs

Large diffs are not rendered by default.

254 changes: 146 additions & 108 deletions pool-apps/pool/src/lib/channel_manager/mod.rs

Large diffs are not rendered by default.

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions pool-apps/pool/src/lib/downstream/common_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ impl HandleCommonMessagesFromClientAsync for Downstream {
&self,
_client_id: Option<usize>,
) -> Result<Vec<u16>, Self::Error> {
Ok(self
.downstream_data
.super_safe_lock(|data| data.negotiated_extensions.clone()))
self.negotiated_extensions
.get()
.map_err(PoolError::shutdown)
}

async fn handle_setup_connection(
Expand Down
12 changes: 6 additions & 6 deletions pool-apps/pool/src/lib/downstream/extensions_message_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ impl HandleExtensionsFromClientAsync for Downstream {
&self,
_client_id: Option<usize>,
) -> Result<Vec<u16>, Self::Error> {
Ok(self
.downstream_data
.super_safe_lock(|data| data.negotiated_extensions.clone()))
self.negotiated_extensions
.get()
.map_err(PoolError::shutdown)
}

async fn handle_request_extensions(
Expand Down Expand Up @@ -114,9 +114,9 @@ impl HandleExtensionsFromClientAsync for Downstream {
);

// Store the negotiated extensions in the shared downstream data
self.downstream_data.super_safe_lock(|data| {
data.negotiated_extensions = supported.clone();
});
self.negotiated_extensions
.set(supported.clone())
.map_err(PoolError::shutdown)?;

let success = RequestExtensionsSuccess {
request_id: msg.request_id,
Expand Down
78 changes: 33 additions & 45 deletions pool-apps/pool/src/lib/downstream/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
use std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
},
use std::sync::{
atomic::{AtomicBool, AtomicU32},
Arc,
};

use async_channel::{unbounded, Receiver, Sender};
use bitcoin_core_sv2::template_distribution_protocol::CancellationToken;
use stratum_apps::{
channel_utils::ReceiverCleanup,
custom_mutex::Mutex,
network_helpers::noise_stream::NoiseTcpStream,
stratum_core::{
channels_sv2::server::{
Expand All @@ -21,6 +17,7 @@ use stratum_apps::{
handlers_sv2::{HandleCommonMessagesFromClientAsync, HandleExtensionsFromClientAsync},
parsers_sv2::{parse_message_frame_with_tlvs, AnyMessage, Mining, Tlv},
},
sync::{SharedLock, SharedMap},
task_manager::TaskManager,
utils::{
protocol_message_type::{protocol_message_type, MessageType},
Expand All @@ -38,25 +35,6 @@ use crate::{
mod common_message_handler;
mod extensions_message_handler;

/// Holds state related to a downstream connection's mining channels.
///
/// This includes:
/// - Whether the downstream requires a standard job (`require_std_job`).
/// - A [`GroupChannel`].
/// - Active [`ExtendedChannel`]s keyed by channel ID.
/// - Active [`StandardChannel`]s keyed by channel ID.
/// - Extensions that have been successfully negotiated with this client
pub struct DownstreamData {
pub group_channel: GroupChannel<'static>,
pub extended_channels: HashMap<ChannelId, ExtendedChannel<'static>>,
pub standard_channels: HashMap<ChannelId, StandardChannel<'static>>,
pub channel_id_factory: AtomicU32,
/// Extensions that have been successfully negotiated with this client
pub negotiated_extensions: Vec<u16>,
/// Payout mode derived from user_identity (None until channel is opened)
pub payout_mode: Option<PayoutMode>,
}

/// Communication layer for a downstream connection.
///
/// Provides the messaging primitives for interacting with the
Expand Down Expand Up @@ -84,7 +62,14 @@ impl DownstreamIo {
/// Represents a downstream client connected to this node.
#[derive(Clone)]
pub struct Downstream {
pub downstream_data: Arc<Mutex<DownstreamData>>,
pub group_channel: SharedLock<GroupChannel<'static>>,
pub extended_channels: SharedMap<ChannelId, ExtendedChannel<'static>>,
pub standard_channels: SharedMap<ChannelId, StandardChannel<'static>>,
pub channel_id_factory: Arc<AtomicU32>,
/// Extensions that have been successfully negotiated with this client
pub negotiated_extensions: SharedLock<Vec<u16>>,
/// Payout mode derived from user_identity (None until channel is opened)
pub payout_mode: SharedLock<Option<PayoutMode>>,
downstream_io: DownstreamIo,
pub downstream_id: usize,
pub requires_standard_jobs: Arc<AtomicBool>,
Expand Down Expand Up @@ -181,18 +166,14 @@ impl Downstream {
downstream_receiver: inbound_rx,
};

let downstream_data = Arc::new(Mutex::new(DownstreamData {
extended_channels: HashMap::new(),
standard_channels: HashMap::new(),
group_channel,
channel_id_factory,
negotiated_extensions: vec![],
payout_mode: None,
}));

Downstream {
downstream_io,
downstream_data,
group_channel: SharedLock::new(group_channel),
extended_channels: SharedMap::new(),
standard_channels: SharedMap::new(),
channel_id_factory: Arc::new(channel_id_factory),
negotiated_extensions: SharedLock::new(Vec::new()),
payout_mode: SharedLock::new(None),
downstream_id,
requires_standard_jobs: Arc::new(AtomicBool::new(false)),
requires_custom_work: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -284,10 +265,13 @@ impl Downstream {
.recv()
.await
.map_err(|error| PoolError::disconnect(error, self.downstream_id))?;
let header = frame.get_header().ok_or_else(|| {
let Some(header) = frame.get_header() else {
error!("SV2 frame missing header");
PoolError::disconnect(framing_sv2::Error::MissingHeader, self.downstream_id)
})?;
return Err(PoolError::disconnect(
framing_sv2::Error::MissingHeader,
self.downstream_id,
));
};
// The first ever message received on a new downstream connection
// should always be a setup connection message.
if header.msg_type() == MESSAGE_TYPE_SETUP_CONNECTION {
Expand Down Expand Up @@ -347,17 +331,21 @@ impl Downstream {
.recv()
.await
.map_err(|error| PoolError::disconnect(error, self.downstream_id))?;
let header = sv2_frame.get_header().ok_or_else(|| {
let Some(header) = sv2_frame.get_header() else {
error!("SV2 frame missing header");
PoolError::disconnect(framing_sv2::Error::MissingHeader, self.downstream_id)
})?;
return Err(PoolError::disconnect(
framing_sv2::Error::MissingHeader,
self.downstream_id,
));
};

match protocol_message_type(header.ext_type(), header.msg_type()) {
MessageType::Mining => {
debug!("Received mining SV2 frame from downstream.");
let negotiated_extensions = self
.downstream_data
.super_safe_lock(|data| data.negotiated_extensions.clone());
.negotiated_extensions
.get()
.map_err(PoolError::shutdown)?;
let (any_message, tlv_fields) = parse_message_frame_with_tlvs(
header,
sv2_frame.payload(),
Expand Down
161 changes: 79 additions & 82 deletions pool-apps/pool/src/lib/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,93 +10,94 @@ use stratum_apps::monitoring::client::{
use crate::{channel_manager::ChannelManager, downstream::Downstream};

/// Helper to convert a Downstream to Sv2ClientInfo.
/// Returns None if the lock cannot be acquired (graceful degradation for monitoring).
fn downstream_to_sv2_client_info(client: &Downstream) -> Option<Sv2ClientInfo> {
client
.downstream_data
.safe_lock(|dd| {
let mut extended_channels = Vec::new();
let mut standard_channels = Vec::new();
let mut extended_channels = Vec::new();
let mut standard_channels = Vec::new();

for (_channel_id, extended_channel) in dd.extended_channels.iter() {
let channel_id = extended_channel.get_channel_id();
let target = extended_channel.get_target();
let requested_max_target = extended_channel.get_requested_max_target();
let user_identity = extended_channel.get_user_identity();
let share_accounting = extended_channel.get_share_accounting();
client
.extended_channels
.for_each(|_channel_id, extended_channel| {
let channel_id = extended_channel.get_channel_id();
let target = extended_channel.get_target();
let requested_max_target = extended_channel.get_requested_max_target();
let user_identity = extended_channel.get_user_identity();
let share_accounting = extended_channel.get_share_accounting();

extended_channels.push(ExtendedChannelInfo {
channel_id,
user_identity: user_identity.to_string(),
nominal_hashrate: extended_channel.get_nominal_hashrate(),
stable_hashrate: extended_channel.get_stable_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(extended_channel.get_extranonce_prefix()),
full_extranonce_size: extended_channel.get_full_extranonce_size(),
rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(),
expected_shares_per_minute: extended_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
shares_rejected: share_accounting.get_rejected_shares_count(),
shares_rejected_by_reason: share_accounting
.get_rejected_shares()
.map(|(reason, count)| (reason.to_string(), count))
.collect(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
}
extended_channels.push(ExtendedChannelInfo {
channel_id,
user_identity: user_identity.to_string(),
nominal_hashrate: extended_channel.get_nominal_hashrate(),
stable_hashrate: extended_channel.get_stable_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(extended_channel.get_extranonce_prefix()),
full_extranonce_size: extended_channel.get_full_extranonce_size(),
rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(),
expected_shares_per_minute: extended_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
shares_rejected: share_accounting.get_rejected_shares_count(),
shares_rejected_by_reason: share_accounting
.get_rejected_shares()
.map(|(reason, count)| (reason.to_string(), count))
.collect(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
});

for (_channel_id, standard_channel) in dd.standard_channels.iter() {
let channel_id = standard_channel.get_channel_id();
let target = standard_channel.get_target();
let requested_max_target = standard_channel.get_requested_max_target();
let user_identity = standard_channel.get_user_identity();
let share_accounting = standard_channel.get_share_accounting();
client
.standard_channels
.for_each(|_channel_id, standard_channel| {
let channel_id = standard_channel.get_channel_id();
let target = standard_channel.get_target();
let requested_max_target = standard_channel.get_requested_max_target();
let user_identity = standard_channel.get_user_identity();
let share_accounting = standard_channel.get_share_accounting();

standard_channels.push(StandardChannelInfo {
channel_id,
user_identity: user_identity.to_string(),
nominal_hashrate: standard_channel.get_nominal_hashrate(),
stable_hashrate: standard_channel.get_stable_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()),
expected_shares_per_minute: standard_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
shares_rejected: share_accounting.get_rejected_shares_count(),
shares_rejected_by_reason: share_accounting
.get_rejected_shares()
.map(|(reason, count)| (reason.to_string(), count))
.collect(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
}
standard_channels.push(StandardChannelInfo {
channel_id,
user_identity: user_identity.to_string(),
nominal_hashrate: standard_channel.get_nominal_hashrate(),
stable_hashrate: standard_channel.get_stable_hashrate(),
target_hex: hex::encode(target.to_be_bytes()),
requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()),
extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()),
expected_shares_per_minute: standard_channel.get_shares_per_minute(),
shares_accepted: share_accounting.get_shares_accepted(),
shares_rejected: share_accounting.get_rejected_shares_count(),
shares_rejected_by_reason: share_accounting
.get_rejected_shares()
.map(|(reason, count)| (reason.to_string(), count))
.collect(),
share_work_sum: share_accounting.get_share_work_sum(),
last_share_sequence_number: share_accounting.get_last_share_sequence_number(),
best_diff: share_accounting.get_best_diff(),
last_batch_accepted: share_accounting.get_last_batch_accepted(),
last_batch_work_sum: share_accounting.get_last_batch_work_sum(),
share_batch_size: share_accounting.get_share_batch_size(),
blocks_found: share_accounting.get_blocks_found(),
});
});

Sv2ClientInfo::new(client.downstream_id, extended_channels, standard_channels)
})
.ok()
Some(Sv2ClientInfo::new(
client.downstream_id,
extended_channels,
standard_channels,
))
}

impl Sv2ClientsMonitoring for ChannelManager {
fn get_sv2_clients(&self) -> Vec<Sv2ClientInfo> {
// Clone Downstream references and release lock immediately to avoid contention
// with template distribution and message handling
let downstream_refs: Vec<Downstream> = self
.channel_manager_data
.safe_lock(|data| data.downstream.values().cloned().collect())
.unwrap_or_default();
let mut downstream_refs: Vec<Downstream> = Vec::new();
self.downstreams
.for_each(|_, downstream| downstream_refs.push(downstream.clone()));

downstream_refs
.iter()
Expand All @@ -105,12 +106,8 @@ impl Sv2ClientsMonitoring for ChannelManager {
}

fn get_sv2_client_by_id(&self, client_id: usize) -> Option<Sv2ClientInfo> {
self.channel_manager_data
.safe_lock(|d| {
d.downstream
.get(&client_id)
.and_then(downstream_to_sv2_client_info)
})
.unwrap_or(None)
self.downstreams.with(&client_id, |downstream| {
downstream_to_sv2_client_info(downstream)
})?
}
}
Loading
Loading