diff --git a/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs b/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs index f9f2168ef..c4f33ee23 100644 --- a/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs +++ b/pool-apps/pool/src/lib/channel_manager/mining_message_handler.rs @@ -56,16 +56,11 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { ) -> Result, Self::Error> { let downstream_id = client_id.expect("client_id must be present for downstream_id extraction"); - self.channel_manager_data.super_safe_lock(|data| { - let Some(downstream) = data.downstream.get(&downstream_id) else { - return Err(PoolError::disconnect( - PoolErrorKind::DownstreamNotFound(downstream_id), - downstream_id, - )); - }; + self.with_registered_downstream(downstream_id, |downstream| { downstream - .downstream_data - .super_safe_lock(|data| Ok(data.negotiated_extensions.clone())) + .negotiated_extensions + .get() + .map_err(PoolError::shutdown) }) } @@ -78,27 +73,21 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { info!("Received Close Channel: {msg}"); let downstream_id = client_id.expect("client_id must be present for downstream_id extraction"); - self.channel_manager_data - .super_safe_lock(|channel_manager_data| { - let Some(downstream) = channel_manager_data.downstream.get_mut(&downstream_id) - else { - return Err(PoolError::disconnect( - PoolErrorKind::DownstreamNotFound(downstream_id), - downstream_id, - )); - }; - - downstream - .downstream_data - .super_safe_lock(|downstream_data| { - downstream_data.standard_channels.remove(&msg.channel_id); - downstream_data.extended_channels.remove(&msg.channel_id); - }); - channel_manager_data - .vardiff - .remove(&(downstream_id, msg.channel_id).into()); - Ok(()) - }) + self.with_registered_downstream(downstream_id, |downstream| { + downstream + .group_channel + .with(|group_channel| { + if group_channel.has_channel_id(msg.channel_id) { + group_channel.remove_channel_id(msg.channel_id); + } + }) + .map_err(PoolError::shutdown)?; + downstream.standard_channels.remove(&msg.channel_id); + downstream.extended_channels.remove(&msg.channel_id); + Ok(()) + })?; + self.vardiff.remove(&(downstream_id, msg.channel_id).into()); + Ok(()) } async fn handle_open_standard_mining_channel( @@ -114,62 +103,97 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { info!("Received OpenStandardMiningChannel: {}", msg); - let messages = self.channel_manager_data.super_safe_lock(|channel_manager_data| { - let Some(downstream) = channel_manager_data.downstream.get_mut(&downstream_id) else { - return Err(PoolError::disconnect(PoolErrorKind::DownstreamIdNotFound, downstream_id)); - }; - - if downstream.requires_custom_work.load(Ordering::SeqCst) { - error!("OpenStandardMiningChannel: Standard Channels are not supported for this connection"); - let open_standard_mining_channel_error = OpenMiningChannelError { - request_id, - error_code: ERROR_CODE_OPEN_MINING_CHANNEL_STANDARD_CHANNELS_NOT_SUPPORTED_FOR_CUSTOM_WORK - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - return Ok(vec![(downstream_id, Mining::OpenMiningChannelError(open_standard_mining_channel_error)).into()]); - } - - let Some(last_future_template) = channel_manager_data.last_future_template.clone() else { - return Err(PoolError::disconnect(PoolErrorKind::FutureTemplateNotPresent, downstream_id)); - }; - - let Some(last_set_new_prev_hash_tdp) = channel_manager_data.last_new_prev_hash.clone() else { - return Err(PoolError::disconnect(PoolErrorKind::LastNewPrevhashNotFound, downstream_id)); - }; - - let payout_mode = match PayoutMode::try_from(user_identity.as_str()) { - Ok(mode) => mode, - Err(PayoutModeError::NoPayoutMode(_)) => PayoutMode::FullDonation, - Err(_) => { - error!("Invalid user_identity '{}': does not match any supported identity format", user_identity); + let messages = self.with_registered_downstream(downstream_id, |downstream| { + if downstream.requires_custom_work.load(Ordering::SeqCst) { + error!("OpenStandardMiningChannel: Standard Channels are not supported for this connection"); let open_standard_mining_channel_error = OpenMiningChannelError { request_id, - error_code: ERROR_CODE_OPEN_MINING_CHANNEL_INVALID_USER_IDENTITY + error_code: ERROR_CODE_OPEN_MINING_CHANNEL_STANDARD_CHANNELS_NOT_SUPPORTED_FOR_CUSTOM_WORK .to_string() .try_into() .expect("error code must be valid string"), }; - return Ok(vec![(downstream_id, Mining::OpenMiningChannelError(open_standard_mining_channel_error)).into()]); + return Ok(vec![( + downstream_id, + Mining::OpenMiningChannelError(open_standard_mining_channel_error), + ) + .into()]); } - }; - let coinbase_outputs = payout_mode.coinbase_outputs( - last_future_template.coinbase_tx_value_remaining, - &self.coinbase_reward_script, - ); + let Some(last_future_template) = self + .last_future_template + .get() + .map_err(PoolError::shutdown)? + else { + return Err(PoolError::disconnect( + PoolErrorKind::FutureTemplateNotPresent, + downstream_id, + )); + }; - downstream.downstream_data.super_safe_lock(|downstream_data| { - downstream_data.payout_mode = Some(payout_mode); + let Some(last_set_new_prev_hash_tdp) = + self.last_new_prev_hash.get().map_err(PoolError::shutdown)? + else { + return Err(PoolError::disconnect( + PoolErrorKind::LastNewPrevhashNotFound, + downstream_id, + )); + }; + + let payout_mode = match PayoutMode::try_from(user_identity.as_str()) { + Ok(mode) => mode, + Err(PayoutModeError::NoPayoutMode(_)) => PayoutMode::FullDonation, + Err(_) => { + error!( + "Invalid user_identity '{}': does not match any supported identity format", + user_identity + ); + let open_standard_mining_channel_error = OpenMiningChannelError { + request_id, + error_code: ERROR_CODE_OPEN_MINING_CHANNEL_INVALID_USER_IDENTITY + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + return Ok(vec![( + downstream_id, + Mining::OpenMiningChannelError(open_standard_mining_channel_error), + ) + .into()]); + } + }; + + let coinbase_outputs = payout_mode.coinbase_outputs( + last_future_template.coinbase_tx_value_remaining, + &self.coinbase_reward_script, + ); + + downstream + .payout_mode + .set(Some(payout_mode)) + .map_err(PoolError::shutdown)?; let nominal_hash_rate = msg.nominal_hash_rate; - let requested_max_target = Target::from_le_bytes(msg.max_target.inner_as_ref().try_into().unwrap()); - let extranonce_prefix = channel_manager_data.extranonce_allocator.allocate_standard().map_err(PoolError::shutdown)?; + let requested_max_target = + Target::from_le_bytes(msg.max_target.inner_as_ref().try_into().unwrap()); + let extranonce_prefix = self + .extranonce_allocator + .with(|allocator| allocator.allocate_standard()) + .map_err(PoolError::shutdown)? + .map_err(PoolError::shutdown)?; - let channel_id = downstream_data.channel_id_factory.fetch_add(1, Ordering::SeqCst); + let channel_id = downstream.channel_id_factory.fetch_add(1, Ordering::SeqCst); - let mut standard_channel = match StandardChannel::new_for_pool(channel_id, user_identity.to_string(), extranonce_prefix, requested_max_target, nominal_hash_rate, self.share_batch_size, self.shares_per_minute, self.pool_tag_string.clone()) { + let mut standard_channel = match StandardChannel::new_for_pool( + channel_id, + user_identity.to_string(), + extranonce_prefix, + requested_max_target, + nominal_hash_rate, + self.share_batch_size, + self.shares_per_minute, + self.pool_tag_string.clone(), + ) { Ok(channel) => channel, Err(e) => match e { StandardChannelError::OpenChannelInvalidNominalHashrate(code) => { @@ -181,34 +205,58 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { .try_into() .expect("error code must be valid string"), }; - return Ok(vec![(downstream_id, Mining::OpenMiningChannelError(open_standard_mining_channel_error)).into()]); + return Ok(vec![( + downstream_id, + Mining::OpenMiningChannelError(open_standard_mining_channel_error), + ) + .into()]); } _ => { error!("error in handle_open_standard_mining_channel: {:?}", e); - return Err(PoolError::disconnect(PoolErrorKind::ChannelErrorSender, downstream_id) ); + return Err(PoolError::disconnect( + PoolErrorKind::ChannelErrorSender, + downstream_id, + )); } }, }; - let group_channel_id = downstream_data.group_channel.get_group_channel_id(); + let group_channel_id = downstream + .group_channel + .with(|channel| channel.get_group_channel_id()) + .map_err(PoolError::shutdown)?; let extranonce_prefix_size = standard_channel.get_extranonce_prefix().len(); let open_standard_mining_channel_success = OpenStandardMiningChannelSuccess { request_id: msg.request_id, channel_id, target: standard_channel.get_target().to_le_bytes().into(), - extranonce_prefix: standard_channel.get_extranonce_prefix().to_vec().try_into().expect("Extranonce_prefix must be valid"), - group_channel_id - }.into_static(); + extranonce_prefix: standard_channel + .get_extranonce_prefix() + .to_vec() + .try_into() + .expect("Extranonce_prefix must be valid"), + group_channel_id, + } + .into_static(); - let mut messages: Vec = Vec::new(); + let mut messages: Vec = Vec::new(); - messages.push((downstream_id, Mining::OpenStandardMiningChannelSuccess(open_standard_mining_channel_success)).into()); + messages.push( + ( + downstream_id, + Mining::OpenStandardMiningChannelSuccess( + open_standard_mining_channel_success, + ), + ) + .into(), + ); let template_id = last_future_template.template_id; - // create a future standard job based on the last future template - standard_channel.on_new_template(last_future_template, coinbase_outputs.clone()).map_err(PoolError::shutdown)?; + standard_channel + .on_new_template(last_future_template, coinbase_outputs.clone()) + .map_err(PoolError::shutdown)?; let future_standard_job_id = standard_channel .get_future_job_id_from_template_id(template_id) .expect("future job id must exist"); @@ -218,7 +266,13 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { let future_standard_job_message = future_standard_job.get_job_message().clone().into_static(); - messages.push((downstream_id, Mining::NewMiningJob(future_standard_job_message)).into()); + messages.push( + ( + downstream_id, + Mining::NewMiningJob(future_standard_job_message), + ) + .into(), + ); let prev_hash = last_set_new_prev_hash_tdp.prev_hash.clone(); let header_timestamp = last_set_new_prev_hash_tdp.header_timestamp; let n_bits = last_set_new_prev_hash_tdp.n_bits; @@ -231,23 +285,36 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { }; standard_channel - .on_set_new_prev_hash(last_set_new_prev_hash_tdp.clone()).map_err(PoolError::shutdown)?; + .on_set_new_prev_hash(last_set_new_prev_hash_tdp.clone()) + .map_err(PoolError::shutdown)?; - messages.push((downstream_id, Mining::SetNewPrevHash(set_new_prev_hash_mining)).into()); + messages.push( + ( + downstream_id, + Mining::SetNewPrevHash(set_new_prev_hash_mining), + ) + .into(), + ); - downstream_data.standard_channels.insert(channel_id, standard_channel); + downstream + .standard_channels + .insert(channel_id, standard_channel); if !downstream.requires_standard_jobs.load(Ordering::SeqCst) { - downstream_data.group_channel.add_channel_id(channel_id, extranonce_prefix_size).map_err(|e| { - error!("Failed to add channel id to group channel: {:?}", e); - PoolError::shutdown(e) - })?; + downstream + .group_channel + .with(|channel| channel.add_channel_id(channel_id, extranonce_prefix_size)) + .map_err(PoolError::shutdown)? + .map_err(|e| { + error!("Failed to add channel id to group channel: {:?}", e); + PoolError::shutdown(e) + })?; } let vardiff = VardiffState::new().map_err(PoolError::shutdown)?; - channel_manager_data.vardiff.insert((downstream_id, channel_id).into(), vardiff); + self.vardiff + .insert((downstream_id, channel_id).into(), vardiff); Ok(messages) - }) - })?; + })?; for message in messages { // A send can only fail if the receiver side of the channel is closed. @@ -278,265 +345,264 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { Target::from_le_bytes(msg.max_target.inner_as_ref().try_into().unwrap()); let requested_min_rollable_extranonce_size = msg.min_extranonce_size; - let messages = self - .channel_manager_data - .super_safe_lock(|channel_manager_data| { - let Some(downstream) = channel_manager_data.downstream.get_mut(&downstream_id) - else { - return Err(PoolError::disconnect(PoolErrorKind::DownstreamIdNotFound, downstream_id)); - }; - + let messages = self.with_registered_downstream(downstream_id, |downstream| { if downstream.requires_standard_jobs.load(Ordering::SeqCst) { let open_extended_mining_channel_error = OpenMiningChannelError { - request_id, - error_code: ERROR_CODE_OPEN_MINING_CHANNEL_EXTENDED_CHANNELS_NOT_SUPPORTED_FOR_STANDARD_JOBS - .to_string() - .try_into() - .expect("error code must be valid string"), - }; + request_id, + error_code: ERROR_CODE_OPEN_MINING_CHANNEL_EXTENDED_CHANNELS_NOT_SUPPORTED_FOR_STANDARD_JOBS + .to_string() + .try_into() + .expect("error code must be valid string"), + }; return Ok(vec![( downstream_id, - Mining::OpenMiningChannelError( - open_extended_mining_channel_error, - ), + Mining::OpenMiningChannelError(open_extended_mining_channel_error), ) .into()]); } - downstream - .downstream_data - .super_safe_lock(|downstream_data| { - let mut messages: Vec = Vec::new(); - - let extranonce_prefix = match channel_manager_data - .extranonce_allocator - .allocate_extended(requested_min_rollable_extranonce_size.into()) - { - Ok(prefix) => prefix, - Err(_) => { - error!("OpenMiningChannelError: min-extranonce-size-too-large"); - let open_extended_mining_channel_error = OpenMiningChannelError { - request_id, - error_code: ERROR_CODE_OPEN_MINING_CHANNEL_MIN_EXTRANONCE_SIZE_TOO_LARGE - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - return Ok(vec![( - downstream_id, - Mining::OpenMiningChannelError( - open_extended_mining_channel_error, - ), - ) - .into()]); - } - }; + let mut messages: Vec = Vec::new(); - let payout_mode = match PayoutMode::try_from(user_identity.as_str()) { - Ok(mode) => mode, - Err(PayoutModeError::NoPayoutMode(_)) => PayoutMode::FullDonation, - Err(_) => { - error!("Invalid user_identity '{}': does not match any supported identity format", user_identity); - let open_extended_mining_channel_error = OpenMiningChannelError { - request_id, - error_code: ERROR_CODE_OPEN_MINING_CHANNEL_INVALID_USER_IDENTITY - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - return Ok(vec![( - downstream_id, - Mining::OpenMiningChannelError( - open_extended_mining_channel_error, - ), - ) - .into()]); - } + let extranonce_prefix = match self + .extranonce_allocator + .with(|allocator| { + allocator.allocate_extended(requested_min_rollable_extranonce_size.into()) + }) + .map_err(PoolError::shutdown)? + { + Ok(prefix) => prefix, + Err(_) => { + error!("OpenMiningChannelError: min-extranonce-size-too-large"); + let open_extended_mining_channel_error = OpenMiningChannelError { + request_id, + error_code: ERROR_CODE_OPEN_MINING_CHANNEL_MIN_EXTRANONCE_SIZE_TOO_LARGE + .to_string() + .try_into() + .expect("error code must be valid string"), }; + return Ok(vec![( + downstream_id, + Mining::OpenMiningChannelError(open_extended_mining_channel_error), + ) + .into()]); + } + }; - downstream_data.payout_mode = Some(payout_mode.clone()); - - let channel_id = downstream_data - .channel_id_factory - .fetch_add(1, Ordering::SeqCst); - - let mut extended_channel = match ExtendedChannel::new_for_pool( - channel_id, - user_identity.to_string(), - extranonce_prefix, - requested_max_target, - nominal_hash_rate, - true, // version rolling always allowed - CLIENT_SEARCH_SPACE_BYTES as u16, - self.share_batch_size, - self.shares_per_minute, - self.pool_tag_string.clone(), - ) { - Ok(channel) => channel, - Err(e) => { - match e { - ExtendedChannelError::OpenChannelInvalidNominalHashrate(code) => { - error!("OpenMiningChannelError: {}", code); - let open_extended_mining_channel_error = - OpenMiningChannelError { - request_id, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - return Ok(vec![( - downstream_id, - Mining::OpenMiningChannelError( - open_extended_mining_channel_error, - ), - ) - .into()]); - } - ExtendedChannelError::RequestedMinExtranonceSizeTooLarge(code) => { - error!("OpenMiningChannelError: {}", code); - let open_extended_mining_channel_error = - OpenMiningChannelError { - request_id, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - return Ok(vec![( - downstream_id, - Mining::OpenMiningChannelError( - open_extended_mining_channel_error, - ), - ) - .into()]); - } - e => { - error!("error in handle_open_extended_mining_channel: {:?}", e); - return Err(PoolError::disconnect(e, downstream_id))?; - } - } - }, + let payout_mode = match PayoutMode::try_from(user_identity.as_str()) { + Ok(mode) => mode, + Err(PayoutModeError::NoPayoutMode(_)) => PayoutMode::FullDonation, + Err(_) => { + error!( + "Invalid user_identity '{}': does not match any supported identity format", + user_identity + ); + let open_extended_mining_channel_error = OpenMiningChannelError { + request_id, + error_code: ERROR_CODE_OPEN_MINING_CHANNEL_INVALID_USER_IDENTITY + .to_string() + .try_into() + .expect("error code must be valid string"), }; + return Ok(vec![( + downstream_id, + Mining::OpenMiningChannelError(open_extended_mining_channel_error), + ) + .into()]); + } + }; - let group_channel_id = downstream_data.group_channel.get_group_channel_id(); + downstream + .payout_mode + .set(Some(payout_mode.clone())) + .map_err(PoolError::shutdown)?; - let open_extended_mining_channel_success = - OpenExtendedMiningChannelSuccess { - request_id, - channel_id, - target: extended_channel.get_target().to_le_bytes().into(), - extranonce_prefix: extended_channel - .get_extranonce_prefix() - .to_vec() - .try_into().map_err(PoolError::shutdown)?, - extranonce_size: extended_channel.get_rollable_extranonce_size(), - group_channel_id, - } - .into_static(); - info!("Sending OpenExtendedMiningChannel.Success (downstream_id: {downstream_id}): {open_extended_mining_channel_success}"); + let channel_id = downstream.channel_id_factory.fetch_add(1, Ordering::SeqCst); - messages.push( - ( + let mut extended_channel = match ExtendedChannel::new_for_pool( + channel_id, + user_identity.to_string(), + extranonce_prefix, + requested_max_target, + nominal_hash_rate, + true, // version rolling always allowed + CLIENT_SEARCH_SPACE_BYTES as u16, + self.share_batch_size, + self.shares_per_minute, + self.pool_tag_string.clone(), + ) { + Ok(channel) => channel, + Err(e) => match e { + ExtendedChannelError::OpenChannelInvalidNominalHashrate(code) => { + error!("OpenMiningChannelError: {}", code); + let open_extended_mining_channel_error = OpenMiningChannelError { + request_id, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + return Ok(vec![( downstream_id, - Mining::OpenExtendedMiningChannelSuccess( - open_extended_mining_channel_success, - ), + Mining::OpenMiningChannelError(open_extended_mining_channel_error), ) - .into(), - ); + .into()]); + } + ExtendedChannelError::RequestedMinExtranonceSizeTooLarge(code) => { + error!("OpenMiningChannelError: {}", code); + let open_extended_mining_channel_error = OpenMiningChannelError { + request_id, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + return Ok(vec![( + downstream_id, + Mining::OpenMiningChannelError(open_extended_mining_channel_error), + ) + .into()]); + } + e => { + error!("error in handle_open_extended_mining_channel: {:?}", e); + return Err(PoolError::disconnect(e, downstream_id)); + } + }, + }; - let Some(last_set_new_prev_hash_tdp) = - channel_manager_data.last_new_prev_hash.clone() - else { - return Err(PoolError::disconnect(PoolErrorKind::LastNewPrevhashNotFound, downstream_id)); - }; + let group_channel_id = downstream + .group_channel + .with(|channel| channel.get_group_channel_id()) + .map_err(PoolError::shutdown)?; - let Some(last_future_template) = - channel_manager_data.last_future_template.clone() - else { - return Err(PoolError::disconnect(PoolErrorKind::FutureTemplateNotPresent,downstream_id)); - }; + let open_extended_mining_channel_success = OpenExtendedMiningChannelSuccess { + request_id, + channel_id, + target: extended_channel.get_target().to_le_bytes().into(), + extranonce_prefix: extended_channel + .get_extranonce_prefix() + .to_vec() + .try_into() + .map_err(PoolError::shutdown)?, + extranonce_size: extended_channel.get_rollable_extranonce_size(), + group_channel_id, + } + .into_static(); + info!("Sending OpenExtendedMiningChannel.Success (downstream_id: {downstream_id}): {open_extended_mining_channel_success}"); - // if the client requires custom work, we don't need to send any extended - // jobs so we just process the SetNewPrevHash - // message - if downstream.requires_custom_work.load(Ordering::SeqCst) { - extended_channel.on_set_new_prev_hash(last_set_new_prev_hash_tdp).map_err(PoolError::shutdown)?; - // if the client does not require custom work, we need to send the - // future extended job - // and the SetNewPrevHash message - } else { - let coinbase_outputs = payout_mode.coinbase_outputs( - last_future_template.coinbase_tx_value_remaining, - &self.coinbase_reward_script, - ); - - extended_channel.on_new_template( - last_future_template.clone(), - coinbase_outputs, - ).map_err(PoolError::shutdown)?; - - let future_extended_job_id = extended_channel - .get_future_job_id_from_template_id(last_future_template.template_id) - .expect("future job id must exist"); - let future_extended_job = extended_channel - .get_future_job(future_extended_job_id) - .expect("future job must exist"); - - let future_extended_job_message = - future_extended_job.get_job_message().clone().into_static(); - - // send this future job as new job message - // to be immediately activated with the subsequent SetNewPrevHash - // message - messages.push( - ( - downstream_id, - Mining::NewExtendedMiningJob(future_extended_job_message), - ) - .into(), - ); - - // SetNewPrevHash message activates the future job - let prev_hash = last_set_new_prev_hash_tdp.prev_hash.clone(); - let header_timestamp = last_set_new_prev_hash_tdp.header_timestamp; - let n_bits = last_set_new_prev_hash_tdp.n_bits; - let set_new_prev_hash_mining = SetNewPrevHash { - channel_id, - job_id: future_extended_job_id, - prev_hash, - min_ntime: header_timestamp, - nbits: n_bits, - }; + messages.push( + ( + downstream_id, + Mining::OpenExtendedMiningChannelSuccess( + open_extended_mining_channel_success, + ), + ) + .into(), + ); - extended_channel.on_set_new_prev_hash(last_set_new_prev_hash_tdp).map_err(PoolError::shutdown)?; - - messages.push( - ( - downstream_id, - Mining::SetNewPrevHash(set_new_prev_hash_mining), - ) - .into(), - ); - - let full_extranonce_size = extended_channel.get_full_extranonce_size(); - downstream_data.group_channel.add_channel_id(channel_id, full_extranonce_size).map_err(|e| { - error!("Failed to add channel id to group channel: {:?}", e); - PoolError::shutdown(e) - })?; - } + let Some(last_set_new_prev_hash_tdp) = + self.last_new_prev_hash.get().map_err(PoolError::shutdown)? + else { + return Err(PoolError::disconnect( + PoolErrorKind::LastNewPrevhashNotFound, + downstream_id, + )); + }; - downstream_data - .extended_channels - .insert(channel_id, extended_channel); - let vardiff = VardiffState::new().map_err(PoolError::shutdown)?; - channel_manager_data - .vardiff - .insert((downstream_id, channel_id).into(), vardiff); + let Some(last_future_template) = self + .last_future_template + .get() + .map_err(PoolError::shutdown)? + else { + return Err(PoolError::disconnect( + PoolErrorKind::FutureTemplateNotPresent, + downstream_id, + )); + }; - Ok(messages) - }) + // if the client requires custom work, we don't need to send any extended + // jobs so we just process the SetNewPrevHash + // message + if downstream.requires_custom_work.load(Ordering::SeqCst) { + extended_channel + .on_set_new_prev_hash(last_set_new_prev_hash_tdp) + .map_err(PoolError::shutdown)?; + // if the client does not require custom work, we need to send the + // future extended job + // and the SetNewPrevHash message + } else { + let coinbase_outputs = payout_mode.coinbase_outputs( + last_future_template.coinbase_tx_value_remaining, + &self.coinbase_reward_script, + ); + + extended_channel + .on_new_template(last_future_template.clone(), coinbase_outputs) + .map_err(PoolError::shutdown)?; + + let future_extended_job_id = extended_channel + .get_future_job_id_from_template_id(last_future_template.template_id) + .expect("future job id must exist"); + let future_extended_job = extended_channel + .get_future_job(future_extended_job_id) + .expect("future job must exist"); + + let future_extended_job_message = + future_extended_job.get_job_message().clone().into_static(); + + // send this future job as new job message + // to be immediately activated with the subsequent SetNewPrevHash + // message + messages.push( + ( + downstream_id, + Mining::NewExtendedMiningJob(future_extended_job_message), + ) + .into(), + ); + + // SetNewPrevHash message activates the future job + let prev_hash = last_set_new_prev_hash_tdp.prev_hash.clone(); + let header_timestamp = last_set_new_prev_hash_tdp.header_timestamp; + let n_bits = last_set_new_prev_hash_tdp.n_bits; + let set_new_prev_hash_mining = SetNewPrevHash { + channel_id, + job_id: future_extended_job_id, + prev_hash, + min_ntime: header_timestamp, + nbits: n_bits, + }; + + extended_channel + .on_set_new_prev_hash(last_set_new_prev_hash_tdp) + .map_err(PoolError::shutdown)?; + + messages.push( + ( + downstream_id, + Mining::SetNewPrevHash(set_new_prev_hash_mining), + ) + .into(), + ); + + let full_extranonce_size = extended_channel.get_full_extranonce_size(); + downstream + .group_channel + .with(|channel| channel.add_channel_id(channel_id, full_extranonce_size)) + .map_err(PoolError::shutdown)? + .map_err(|e| { + error!("Failed to add channel id to group channel: {:?}", e); + PoolError::shutdown(e) + })?; + } + + downstream + .extended_channels + .insert(channel_id, extended_channel); + let vardiff = VardiffState::new().map_err(PoolError::shutdown)?; + self.vardiff + .insert((downstream_id, channel_id).into(), vardiff); + + Ok(messages) })?; for message in messages { @@ -560,17 +626,11 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { let downstream_id = client_id.expect("client_id must be present for downstream_id extraction"); - let messages = self.channel_manager_data.super_safe_lock(|channel_manager_data| { - let channel_id = msg.channel_id; - - let Some(downstream) = channel_manager_data.downstream.get(&downstream_id) else { - return Err(PoolError::disconnect(PoolErrorKind::DownstreamNotFound(downstream_id), downstream_id)); - }; - - downstream.downstream_data.super_safe_lock(|downstream_data| { - let mut messages: Vec = Vec::new(); - let Some(standard_channel) = downstream_data.standard_channels.get_mut(&channel_id) else { - let submit_shares_error = SubmitSharesError { + let channel_id = msg.channel_id; + let vardiff_key = (downstream_id, channel_id).into(); + let messages = self.with_registered_downstream(downstream_id, |downstream| { + let messages = if !downstream.standard_channels.contains_key(&channel_id) { + let error = SubmitSharesError { channel_id, sequence_number: msg.sequence_number, error_code: ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID @@ -579,143 +639,209 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { .expect("error code must be valid string"), }; error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID); - return Ok(vec![(downstream_id, Mining::SubmitSharesError(submit_shares_error)).into()]); - }; - - let Some(vardiff) = channel_manager_data.vardiff.get_mut(&(downstream_id, channel_id).into()) else { - return Ok(vec![(downstream_id, Mining::CloseChannel(create_close_channel_msg(channel_id, ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID))).into()]); - }; - - let res = standard_channel.validate_share(msg.clone()); - vardiff.increment_shares_since_last_update(); + vec![(downstream_id, Mining::SubmitSharesError(error)).into()] + } else if !self.vardiff.contains_key(&vardiff_key) { + vec![( + downstream_id, + Mining::CloseChannel(create_close_channel_msg( + channel_id, + ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID, + )), + ) + .into()] + } else { + let validation = + downstream + .standard_channels + .with_mut(&channel_id, |standard_channel| { + let mut messages: Vec = Vec::new(); + let res = standard_channel.validate_share(msg.clone()); + match res { + Ok(ShareValidationResult::Valid(share_hash)) => { + let share_accounting = standard_channel.get_share_accounting(); + if share_accounting.should_acknowledge() { + let success = SubmitSharesSuccess { + channel_id, + last_sequence_number: share_accounting + .get_last_share_sequence_number(), + new_submits_accepted_count: share_accounting + .get_last_batch_accepted(), + new_shares_sum: share_accounting + .get_last_batch_work_sum(), + }; + info!("SubmitSharesStandard: {} ✅", success); + messages.push( + (downstream_id, Mining::SubmitSharesSuccess(success)) + .into(), + ); + } else { + let share_work = + standard_channel.get_target().difficulty_float(); + info!( + "SubmitSharesStandard: valid share | downstream_id: {}, channel_id: {}, sequence_number: {}, share_hash: {}, share_work: {} ✅", + downstream_id, channel_id, msg.sequence_number, share_hash, share_work + ); + } + } + Ok(ShareValidationResult::BlockFound( + share_hash, + template_id, + coinbase, + )) => { + info!("SubmitSharesStandard: 💰 Block Found!!! 💰{share_hash}"); + // if we have a template id (i.e.: this was not a custom job) + // we can propagate the solution to the TP + if let Some(template_id) = template_id { + info!("SubmitSharesStandard: Propagating solution to the Template Provider."); + let solution = SubmitSolution { + template_id, + version: msg.version, + header_timestamp: msg.ntime, + header_nonce: msg.nonce, + coinbase_tx: coinbase + .try_into() + .map_err(PoolError::shutdown)?, + }; + messages.push( + TemplateDistribution::SubmitSolution(solution) + .into(), + ); + } + let share_accounting = standard_channel.get_share_accounting(); + let success = SubmitSharesSuccess { + channel_id, + last_sequence_number: share_accounting + .get_last_share_sequence_number(), + new_submits_accepted_count: share_accounting + .get_last_batch_accepted(), + new_shares_sum: share_accounting + .get_last_batch_work_sum(), + }; + messages.push( + (downstream_id, Mining::SubmitSharesSuccess(success)) + .into(), + ); + } + Err(ShareValidationError::Invalid(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::Stale(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::InvalidJobId(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::DoesNotMeetTarget(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::DuplicateShare(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::VersionRollingNotAllowed(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(e) => { + return Err(PoolError::disconnect(e, downstream_id)); + } + } - match res { - Ok(ShareValidationResult::Valid(share_hash)) => { - let share_accounting = standard_channel.get_share_accounting(); - if share_accounting.should_acknowledge() { - let success = SubmitSharesSuccess { - channel_id, - last_sequence_number: share_accounting.get_last_share_sequence_number(), - new_submits_accepted_count: share_accounting.get_last_batch_accepted(), - new_shares_sum: share_accounting.get_last_batch_work_sum(), - }; - info!("SubmitSharesStandard: {} ✅", success); - messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); - } else { - let share_work = standard_channel.get_target().difficulty_float(); - info!( - "SubmitSharesStandard: valid share | downstream_id: {}, channel_id: {}, sequence_number: {}, share_hash: {}, share_work: {} ✅", - downstream_id, channel_id, msg.sequence_number, share_hash, share_work - ); + Ok(messages) + }); + match validation { + Some(validation) => { + self.vardiff.with_mut(&vardiff_key, |vardiff| { + vardiff.increment_shares_since_last_update() + }); + validation? } - - } - Ok(ShareValidationResult::BlockFound(share_hash, template_id, coinbase)) => { - info!("SubmitSharesStandard: 💰 Block Found!!! 💰{share_hash}"); - // if we have a template id (i.e.: this was not a custom job) - // we can propagate the solution to the TP - if let Some(template_id) = template_id { - info!("SubmitSharesStandard: Propagating solution to the Template Provider."); - let solution = SubmitSolution { - template_id, - version: msg.version, - header_timestamp: msg.ntime, - header_nonce: msg.nonce, - coinbase_tx: coinbase.try_into().map_err(PoolError::shutdown)?, + None => { + let submit_shares_error = SubmitSharesError { + channel_id, + sequence_number: msg.sequence_number, + error_code: ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID + .to_string() + .try_into() + .expect("error code must be valid string"), }; - messages.push(TemplateDistribution::SubmitSolution(solution).into()); + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID); + vec![( + downstream_id, + Mining::SubmitSharesError(submit_shares_error), + ) + .into()] } - let share_accounting = standard_channel.get_share_accounting(); - let success = SubmitSharesSuccess { - channel_id, - last_sequence_number: share_accounting.get_last_share_sequence_number(), - new_submits_accepted_count: share_accounting.get_last_batch_accepted(), - new_shares_sum: share_accounting.get_last_batch_work_sum(), - }; - messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); - } - Err(ShareValidationError::Invalid(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::Stale(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::InvalidJobId(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::DoesNotMeetTarget(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::DuplicateShare(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::VersionRollingNotAllowed(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(e) => { - return Err(PoolError::disconnect(e, downstream_id))?; } - } + }; Ok(messages) - }) - })?; + })?; for message in messages { // A send can only fail if the receiver side of the channel is closed. @@ -757,15 +883,10 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { None }; - let messages = self.channel_manager_data.super_safe_lock(|channel_manager_data| { - let channel_id = msg.channel_id; - let Some(downstream) = channel_manager_data.downstream.get(&downstream_id) else { - return Err(PoolError::disconnect(PoolErrorKind::DownstreamNotFound(downstream_id), downstream_id)); - }; - - downstream.downstream_data.super_safe_lock(|downstream_data| { - let mut messages: Vec = Vec::new(); - let Some(extended_channel) = downstream_data.extended_channels.get_mut(&channel_id) else { + let channel_id = msg.channel_id; + let vardiff_key = (downstream_id, channel_id).into(); + let messages = self.with_registered_downstream(downstream_id, |downstream| { + let messages = if !downstream.extended_channels.contains_key(&channel_id) { let error = SubmitSharesError { channel_id, sequence_number: msg.sequence_number, @@ -775,156 +896,220 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { .expect("error code must be valid string"), }; error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID); - return Ok(vec![(downstream_id, Mining::SubmitSharesError(error)).into()]); - }; - - if let Some(_user_identity) = user_identity { - // here we have the UserIdentity TLV, so we can use it to enhance monitoring of individual miners in the future - } - - let Some(vardiff) = channel_manager_data.vardiff.get_mut(&(downstream_id, channel_id).into()) else { - return Ok(vec![(downstream_id, Mining::CloseChannel(create_close_channel_msg(channel_id, ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID))).into()]); - }; - - let res = extended_channel.validate_share(msg.clone()); - vardiff.increment_shares_since_last_update(); + vec![(downstream_id, Mining::SubmitSharesError(error)).into()] + } else if !self.vardiff.contains_key(&vardiff_key) { + vec![( + downstream_id, + Mining::CloseChannel(create_close_channel_msg( + channel_id, + ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID, + )), + ) + .into()] + } else { + if let Some(_user_identity) = user_identity { + // here we have the UserIdentity TLV, so we can use it to enhance monitoring of + // individual miners in the future + } + let validation = + downstream + .extended_channels + .with_mut(&channel_id, |extended_channel| { + let mut messages: Vec = Vec::new(); + let res = extended_channel.validate_share(msg.clone()); + match res { + Ok(ShareValidationResult::Valid(share_hash)) => { + let share_accounting = extended_channel.get_share_accounting(); + if share_accounting.should_acknowledge() { + let success = SubmitSharesSuccess { + channel_id, + last_sequence_number: share_accounting + .get_last_share_sequence_number(), + new_submits_accepted_count: share_accounting + .get_last_batch_accepted(), + new_shares_sum: share_accounting + .get_last_batch_work_sum(), + }; + info!("SubmitSharesExtended: {} ✅", success); + messages.push( + (downstream_id, Mining::SubmitSharesSuccess(success)) + .into(), + ); + } else { + let share_work = + extended_channel.get_target().difficulty_float(); + info!( + "SubmitSharesExtended: valid share | downstream_id: {}, channel_id: {}, sequence_number: {}, share_hash: {}, share_work: {} ✅", + downstream_id, channel_id, msg.sequence_number, share_hash, share_work + ); + } + } + Ok(ShareValidationResult::BlockFound( + share_hash, + template_id, + coinbase, + )) => { + info!("SubmitSharesExtended: 💰 Block Found!!! 💰{share_hash}"); + if let Some(template_id) = template_id { + info!("SubmitSharesExtended: Propagating solution to the Template Provider."); + let solution = SubmitSolution { + template_id, + version: msg.version, + header_timestamp: msg.ntime, + header_nonce: msg.nonce, + coinbase_tx: coinbase + .try_into() + .map_err(PoolError::shutdown)?, + }; + messages.push( + TemplateDistribution::SubmitSolution(solution) + .into(), + ); + } + let share_accounting = extended_channel.get_share_accounting(); + let success = SubmitSharesSuccess { + channel_id, + last_sequence_number: share_accounting + .get_last_share_sequence_number(), + new_submits_accepted_count: share_accounting + .get_last_batch_accepted(), + new_shares_sum: share_accounting + .get_last_batch_work_sum(), + }; + messages.push( + (downstream_id, Mining::SubmitSharesSuccess(success)) + .into(), + ); + } + Err(ShareValidationError::Invalid(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::Stale(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::InvalidJobId(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::DoesNotMeetTarget(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::DuplicateShare(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::BadExtranonceSize(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(ShareValidationError::VersionRollingNotAllowed(code)) => { + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); + let error = SubmitSharesError { + channel_id: msg.channel_id, + sequence_number: msg.sequence_number, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + (downstream_id, Mining::SubmitSharesError(error)).into(), + ); + } + Err(e) => { + return Err(PoolError::disconnect(e, downstream_id)); + } + } - match res { - Ok(ShareValidationResult::Valid(share_hash)) => { - let share_accounting = extended_channel.get_share_accounting(); - if share_accounting.should_acknowledge() { - let success = SubmitSharesSuccess { - channel_id, - last_sequence_number: share_accounting.get_last_share_sequence_number(), - new_submits_accepted_count: share_accounting.get_last_batch_accepted(), - new_shares_sum: share_accounting.get_last_batch_work_sum(), - }; - info!("SubmitSharesExtended: {} ✅", success); - messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); - } else { - let share_work = extended_channel.get_target().difficulty_float(); - info!( - "SubmitSharesExtended: valid share | downstream_id: {}, channel_id: {}, sequence_number: {}, share_hash: {}, share_work: {} ✅", - downstream_id, channel_id, msg.sequence_number, share_hash, share_work - ); + Ok(messages) + }); + match validation { + Some(validation) => { + self.vardiff.with_mut(&vardiff_key, |vardiff| { + vardiff.increment_shares_since_last_update() + }); + validation? } - } - Ok(ShareValidationResult::BlockFound(share_hash, template_id, coinbase)) => { - info!("SubmitSharesExtended: 💰 Block Found!!! 💰{share_hash}"); - // if we have a template id (i.e.: this was not a custom job) - // we can propagate the solution to the TP - if let Some(template_id) = template_id { - info!("SubmitSharesExtended: Propagating solution to the Template Provider."); - let solution = SubmitSolution { - template_id, - version: msg.version, - header_timestamp: msg.ntime, - header_nonce: msg.nonce, - coinbase_tx: coinbase.try_into().map_err(PoolError::shutdown)?, + None => { + let error = SubmitSharesError { + channel_id, + sequence_number: msg.sequence_number, + error_code: ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID + .to_string() + .try_into() + .expect("error code must be valid string"), }; - messages.push(TemplateDistribution::SubmitSolution(solution).into()); + error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, ERROR_CODE_SUBMIT_SHARES_INVALID_CHANNEL_ID); + vec![(downstream_id, Mining::SubmitSharesError(error)).into()] } - let share_accounting = extended_channel.get_share_accounting(); - let success = SubmitSharesSuccess { - channel_id, - last_sequence_number: share_accounting.get_last_share_sequence_number(), - new_submits_accepted_count: share_accounting.get_last_batch_accepted(), - new_shares_sum: share_accounting.get_last_batch_work_sum(), - }; - messages.push((downstream_id, Mining::SubmitSharesSuccess(success)).into()); - } - Err(ShareValidationError::Invalid(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::Stale(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::InvalidJobId(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); } - Err(ShareValidationError::DoesNotMeetTarget(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::DuplicateShare(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::BadExtranonceSize(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(ShareValidationError::VersionRollingNotAllowed(code)) => { - error!("SubmitSharesError: downstream_id: {}, channel_id: {}, sequence_number: {}, error_code: {} ❌", downstream_id, channel_id, msg.sequence_number, code); - let error = SubmitSharesError { - channel_id: msg.channel_id, - sequence_number: msg.sequence_number, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push((downstream_id, Mining::SubmitSharesError(error)).into()); - } - Err(e) => { - return Err(PoolError::disconnect(e, downstream_id))?; - } - } + }; Ok(messages) - }) - })?; + })?; for message in messages { // A send can only fail if the receiver side of the channel is closed. @@ -948,139 +1133,113 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { let downstream_id = client_id.expect("client_id must be present for downstream_id extraction"); - - let messages: Vec = - self.channel_manager_data - .super_safe_lock(|channel_manager_data| { - let Some(downstream) = channel_manager_data.downstream.get(&downstream_id) - else { - return Err(PoolError::disconnect( - PoolErrorKind::DownstreamNotFound(downstream_id), - downstream_id, - )); - }; - - downstream - .downstream_data - .super_safe_lock(|downstream_data| { - let mut messages = Vec::new(); - let channel_id = msg.channel_id; - let new_nominal_hash_rate = msg.nominal_hash_rate; - let requested_maximum_target = Target::from_le_bytes( - msg.maximum_target.inner_as_ref().try_into().unwrap(), - ); - - if let Some(standard_channel) = - downstream_data.standard_channels.get_mut(&channel_id) - { - let res = standard_channel.update_channel( - new_nominal_hash_rate, - Some(requested_maximum_target), - ); - match res { - Ok(_) => {} - Err(e) => { - error!("UpdateChannelError: {:?}", e); - match e { - StandardChannelError::UpdateChannelInvalidNominalHashrate(code) => { - error!("UpdateChannelError: {}", code); - let update_channel_error = UpdateChannelError { - channel_id, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push( - ( - downstream_id, - Mining::UpdateChannelError( - update_channel_error, - ), - ) - .into(), - ); - } - // We don't care about other variants as they are not - // associated to Update channel, and we will never - // encounter it. - _ => unreachable!(), - } - } + let channel_id = msg.channel_id; + let new_nominal_hash_rate = msg.nominal_hash_rate; + let requested_maximum_target = + Target::from_le_bytes(msg.maximum_target.inner_as_ref().try_into().unwrap()); + let messages = self.with_registered_downstream(downstream_id, |downstream| { + let mut messages: Vec = Vec::new(); + + if downstream + .standard_channels + .with_mut(&channel_id, |standard_channel| { + let res = standard_channel + .update_channel(new_nominal_hash_rate, Some(requested_maximum_target)); + match res { + Ok(_) => {} + Err(e) => { + error!("UpdateChannelError: {:?}", e); + match e { + StandardChannelError::UpdateChannelInvalidNominalHashrate(code) => { + error!("UpdateChannelError: {}", code); + let update_channel_error = UpdateChannelError { + channel_id, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + ( + downstream_id, + Mining::UpdateChannelError(update_channel_error), + ) + .into(), + ); } - let new_target = standard_channel.get_target(); - let set_target = SetTarget { - channel_id, - maximum_target: new_target.to_le_bytes().into(), - }; - messages - .push((downstream_id, Mining::SetTarget(set_target)).into()); - } else if let Some(extended_channel) = - downstream_data.extended_channels.get_mut(&channel_id) - { - let res = extended_channel.update_channel( - new_nominal_hash_rate, - Some(requested_maximum_target), - ); - match res { - Ok(_) => {} - Err(e) => { - error!("UpdateChannelError: {:?}", e); - match e { - ExtendedChannelError::UpdateChannelInvalidNominalHashrate(code) => { - error!("UpdateChannelError: {}", code); - let update_channel_error = UpdateChannelError { - channel_id, - error_code: code - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push( - ( - downstream_id, - Mining::UpdateChannelError( - update_channel_error, - ), - ) - .into(), - ); - } - // We don't care about other variants as they are not - // associated to Update channel, and we will never - // encounter it. - _ => unreachable!(), - } + _ => unreachable!(), + } + } + } + let new_target = standard_channel.get_target(); + let set_target = SetTarget { + channel_id, + maximum_target: new_target.to_le_bytes().into(), + }; + messages.push((downstream_id, Mining::SetTarget(set_target)).into()); + }) + .is_none() + && downstream + .extended_channels + .with_mut(&channel_id, |extended_channel| { + let res = extended_channel + .update_channel(new_nominal_hash_rate, Some(requested_maximum_target)); + match res { + Ok(_) => {} + Err(e) => { + error!("UpdateChannelError: {:?}", e); + match e { + ExtendedChannelError::UpdateChannelInvalidNominalHashrate( + code, + ) => { + error!("UpdateChannelError: {}", code); + let update_channel_error = UpdateChannelError { + channel_id, + error_code: code + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + ( + downstream_id, + Mining::UpdateChannelError(update_channel_error), + ) + .into(), + ); } + _ => unreachable!(), } - let new_target = extended_channel.get_target(); - let set_target = SetTarget { - channel_id, - maximum_target: new_target.to_le_bytes().into(), - }; - messages - .push((downstream_id, Mining::SetTarget(set_target)).into()); - } else { - error!("UpdateChannelError: invalid-channel-id"); - let update_channel_error = UpdateChannelError { - channel_id, - error_code: ERROR_CODE_UPDATE_CHANNEL_INVALID_CHANNEL_ID - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - messages.push( - ( - downstream_id, - Mining::UpdateChannelError(update_channel_error), - ) - .into(), - ); } + } + let new_target = extended_channel.get_target(); + let set_target = SetTarget { + channel_id, + maximum_target: new_target.to_le_bytes().into(), + }; + messages.push((downstream_id, Mining::SetTarget(set_target)).into()); + }) + .is_none() + { + error!("UpdateChannelError: invalid-channel-id"); + let update_channel_error = UpdateChannelError { + channel_id, + error_code: ERROR_CODE_UPDATE_CHANNEL_INVALID_CHANNEL_ID + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + messages.push( + ( + downstream_id, + Mining::UpdateChannelError(update_channel_error), + ) + .into(), + ); + } - Ok(messages) - }) - })?; + Ok(messages) + })?; for message in messages { // A send can only fail if the receiver side of the channel is closed. @@ -1145,49 +1304,37 @@ impl HandleMiningMessagesFromClientAsync for ChannelManager { // Step 2: JDS validated successfully — commit the job to the extended channel. let message: RouteMessageTo = - self.channel_manager_data - .super_safe_lock(|channel_manager_data| { - let Some(downstream) = channel_manager_data.downstream.get_mut(&downstream_id) - else { - return Err(PoolError::disconnect( - PoolErrorKind::DownstreamNotFound(downstream_id), - downstream_id, - )); - }; - - downstream - .downstream_data - .super_safe_lock(|downstream_data| { - let Some(extended_channel) = downstream_data - .extended_channels - .get_mut(&msg_static.channel_id) - else { - error!("SetCustomMiningJobError: invalid-channel-id"); - let error = SetCustomMiningJobError { - request_id: msg_static.request_id, - channel_id: msg_static.channel_id, - error_code: ERROR_CODE_SET_CUSTOM_MINING_JOB_INVALID_CHANNEL_ID - .to_string() - .try_into() - .expect("error code must be valid string"), - }; - return Ok( - (downstream_id, Mining::SetCustomMiningJobError(error)).into() - ); - }; - - let job_id = extended_channel - .on_set_custom_mining_job(msg_static.clone()) - .map_err(|error| PoolError::disconnect(error, downstream_id))?; - - let success = SetCustomMiningJobSuccess { - channel_id: msg_static.channel_id, - request_id: msg_static.request_id, - job_id, - }; - Ok((downstream_id, Mining::SetCustomMiningJobSuccess(success)).into()) - }) - })?; + self.with_registered_downstream(downstream_id, |downstream| { + match downstream.extended_channels.with_mut( + &msg_static.channel_id, + |extended_channel| { + let job_id = extended_channel + .on_set_custom_mining_job(msg_static.clone()) + .map_err(|error| PoolError::disconnect(error, downstream_id))?; + + let success = SetCustomMiningJobSuccess { + channel_id: msg_static.channel_id, + request_id: msg_static.request_id, + job_id, + }; + Ok((downstream_id, Mining::SetCustomMiningJobSuccess(success)).into()) + }, + ) { + Some(message) => message, + None => { + error!("SetCustomMiningJobError: invalid-channel-id"); + let error = SetCustomMiningJobError { + request_id: msg_static.request_id, + channel_id: msg_static.channel_id, + error_code: ERROR_CODE_SET_CUSTOM_MINING_JOB_INVALID_CHANNEL_ID + .to_string() + .try_into() + .expect("error code must be valid string"), + }; + Ok((downstream_id, Mining::SetCustomMiningJobError(error)).into()) + } + } + })?; message .forward(&self.channel_manager_io) diff --git a/pool-apps/pool/src/lib/channel_manager/mod.rs b/pool-apps/pool/src/lib/channel_manager/mod.rs index b18c7442e..ed60ca250 100644 --- a/pool-apps/pool/src/lib/channel_manager/mod.rs +++ b/pool-apps/pool/src/lib/channel_manager/mod.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, net::SocketAddr, sync::{ atomic::{AtomicU32, AtomicUsize}, @@ -14,7 +13,6 @@ use stratum_apps::{ channel_utils::ReceiverCleanup, coinbase_output_constraints::coinbase_output_constraints_message_with_offset, config_helpers::CoinbaseRewardScript, - custom_mutex::Mutex, key_utils::{Secp256k1PublicKey, Secp256k1SecretKey}, network_helpers::accept_noise_connection, stratum_core::{ @@ -31,6 +29,7 @@ use stratum_apps::{ parsers_sv2::{Mining, TemplateDistribution, Tlv}, template_distribution_sv2::{NewTemplate, SetNewPrevHash}, }, + sync::{SharedLock, SharedMap}, task_manager::TaskManager, utils::types::{ChannelId, DownstreamId, SharesPerMinute, VardiffKey}, }; @@ -63,32 +62,11 @@ const POOL_ALLOCATION_BYTES: u8 = POOL_SERVER_BYTES + POOL_LOCAL_PREFIX_BYTES; const CLIENT_SEARCH_SPACE_BYTES: u8 = 16; pub const FULL_EXTRANONCE_SIZE: u8 = POOL_ALLOCATION_BYTES + CLIENT_SEARCH_SPACE_BYTES; -pub struct ChannelManagerData { - // Mapping of `downstream_id` → `Downstream` object, - // used by the channel manager to locate and interact with downstream clients. - pub(crate) downstream: HashMap, - // Unified extranonce prefix allocator, shared by standard and extended - // downstream channels. The allocated [`ExtranoncePrefix`] is stored on the - // channel itself, so dropping the channel automatically releases the slot. - extranonce_allocator: ExtranonceAllocator, - // Factory that assigns a unique ID to each new **downstream connection**. - downstream_id_factory: AtomicUsize, - // Mapping of `(downstream_id, channel_id)` → vardiff controller. - // Each entry manages variable difficulty for a specific downstream channel. - vardiff: HashMap, - // Coinbase outputs - coinbase_outputs: Vec, - // Last new prevhash - last_new_prev_hash: Option>, - // Last future template - last_future_template: Option>, -} - #[derive(Clone)] pub struct ChannelManagerIo { tp_sender: Sender>, tp_receiver: Receiver>, - downstream_sender: Arc>>>, + downstream_sender: SharedMap>, downstream_receiver: Receiver<(usize, Mining<'static>, Option>)>, } @@ -97,12 +75,8 @@ impl ChannelManagerIo { self.tp_sender.close(); self.tp_receiver.close_and_drain(); self.downstream_receiver.close_and_drain(); - self.downstream_sender.super_safe_lock(|downstreams| { - for sender in downstreams.values() { - sender.close(); - } - downstreams.clear(); - }); + self.downstream_sender.for_each(|_, sender| sender.close()); + self.downstream_sender.clear(); } } @@ -111,7 +85,24 @@ impl ChannelManagerIo { /// to perform message traversal. #[derive(Clone)] pub struct ChannelManager { - pub(crate) channel_manager_data: Arc>, + // Mapping of `downstream_id` -> `Downstream` object, + // used by the channel manager to locate and interact with downstream clients. + pub(crate) downstreams: SharedMap, + // Unified extranonce prefix allocator, shared by standard and extended + // downstream channels. The allocated [`ExtranoncePrefix`] is stored on the + // channel itself, so dropping the channel automatically releases the slot. + pub(crate) extranonce_allocator: SharedLock, + // Factory that assigns a unique ID to each new downstream connection. + downstream_id_factory: Arc, + // Mapping of `(downstream_id, channel_id)` -> vardiff controller. + // Each entry manages variable difficulty for a specific downstream channel. + pub(crate) vardiff: SharedMap, + // Coinbase outputs. + pub(crate) coinbase_outputs: Vec, + // Last new prevhash. + pub(crate) last_new_prev_hash: SharedLock>>, + // Last future template. + pub(crate) last_future_template: SharedLock>>, channel_manager_io: ChannelManagerIo, pool_tag_string: String, share_batch_size: usize, @@ -181,25 +172,21 @@ impl ChannelManager { ExtranonceAllocator::new(local_prefix_bytes, FULL_EXTRANONCE_SIZE, POOL_MAX_CHANNELS) .map_err(PoolError::shutdown)?; - let channel_manager_data = Arc::new(Mutex::new(ChannelManagerData { - downstream: HashMap::new(), - extranonce_allocator, - downstream_id_factory: AtomicUsize::new(1), - vardiff: HashMap::new(), - coinbase_outputs, - last_future_template: None, - last_new_prev_hash: None, - })); - let channel_manager_io = ChannelManagerIo { tp_sender, tp_receiver, - downstream_sender: Arc::new(Mutex::new(HashMap::new())), + downstream_sender: SharedMap::new(), downstream_receiver, }; let channel_manager = ChannelManager { - channel_manager_data, + downstreams: SharedMap::new(), + extranonce_allocator: SharedLock::new(extranonce_allocator), + downstream_id_factory: Arc::new(AtomicUsize::new(1)), + vardiff: SharedMap::new(), + coinbase_outputs, + last_future_template: SharedLock::new(None), + last_new_prev_hash: SharedLock::new(None), channel_manager_io, share_batch_size: config.share_batch_size(), shares_per_minute: config.shares_per_minute(), @@ -217,18 +204,23 @@ impl ChannelManager { // Returns a `GroupChannel` if successful, otherwise returns `None`. // // To be called before calling Downstream::new. - fn bootstrap_group_channel(&self, channel_id: ChannelId) -> Option> { - let (last_future_template, last_set_new_prev_hash) = - self.channel_manager_data.super_safe_lock(|data| { - ( - data.last_future_template - .clone() - .expect("No future template found after readiness check"), - data.last_new_prev_hash - .clone() - .expect("No new prevhash found after readiness check"), - ) - }); + #[allow(clippy::result_large_err)] + fn bootstrap_group_channel( + &self, + channel_id: ChannelId, + ) -> PoolResult>, error::ChannelManager> { + let last_future_template = self + .last_future_template + .get() + .map_err(PoolError::shutdown)? + .expect("No future template found after readiness check"); + + let last_set_new_prev_hash = self + .last_new_prev_hash + .get() + .map_err(PoolError::shutdown)? + .expect("No new prevhash found after readiness check"); + let mut group_channel = match GroupChannel::new_for_pool( channel_id, FULL_EXTRANONCE_SIZE as usize, @@ -237,7 +229,7 @@ impl ChannelManager { Ok(channel) => channel, Err(e) => { error!(error = ?e, "Failed to bootstrap group channel"); - return None; + return Ok(None); } }; @@ -248,15 +240,15 @@ impl ChannelManager { if let Err(e) = group_channel.on_new_template(last_future_template, vec![coinbase_output]) { error!(error = ?e, "Failed to add template to group channel"); - return None; + return Ok(None); } if let Err(e) = group_channel.on_set_new_prev_hash(last_set_new_prev_hash) { error!(error = ?e, "Failed to set new prevhash for group channel"); - return None; + return Ok(None); } - Some(group_channel) + Ok(Some(group_channel)) } /// Starts the downstream server, and accepts new connection request. @@ -276,9 +268,14 @@ impl ChannelManager { // Wait for initial template and prevhash before accepting connections loop { - let has_required_data = this.channel_manager_data.super_safe_lock(|data| { - data.last_future_template.is_some() && data.last_new_prev_hash.is_some() - }); + let has_required_data = this + .last_future_template + .with(|template| template.is_some()) + .map_err(PoolError::shutdown)? + && this + .last_new_prev_hash + .with(|prevhash| prevhash.is_some()) + .map_err(PoolError::shutdown)?; if has_required_data { info!("Required template data received, ready to accept connections"); @@ -345,13 +342,20 @@ impl ChannelManager { } }; - let downstream_id = this.channel_manager_data - .super_safe_lock(|data| data.downstream_id_factory.fetch_add(1, Ordering::SeqCst)); + let downstream_id = this + .downstream_id_factory + .fetch_add(1, Ordering::SeqCst); let channel_id_factory = AtomicU32::new(1); let group_channel_id = channel_id_factory.fetch_add(1, Ordering::SeqCst); - let group_channel = match this.bootstrap_group_channel(group_channel_id) { + let Ok(group_channel) = this.bootstrap_group_channel(group_channel_id) else { + error!("Failed to bootstrap group channel - disconnecting downstream {downstream_id}"); + cancellation_token_clone.cancel(); + return; + }; + + let group_channel = match group_channel { Some(group_channel) => group_channel, None => { error!("Failed to bootstrap group channel - disconnecting downstream {downstream_id}"); @@ -375,11 +379,11 @@ impl ChannelManager { this.required_extensions.clone(), ); - this.channel_manager_io.downstream_sender.super_safe_lock(|map| map.insert(downstream_id, channel_manager_sender)); + this.channel_manager_io + .downstream_sender + .insert(downstream_id, channel_manager_sender); - this.channel_manager_data.super_safe_lock(|data| { - data.downstream.insert(downstream_id, downstream.clone()); - }); + this.downstreams.insert(downstream_id, downstream.clone()); downstream .start( @@ -469,15 +473,12 @@ impl ChannelManager { // 1. Removes the corresponding Downstream from the `downstream` map. // 2. Removes the channels of the corresponding Downstream from `vardiff` map. pub fn remove_downstream(&self, downstream_id: DownstreamId) { - self.channel_manager_data.super_safe_lock(|cm_data| { - cm_data.downstream.remove(&downstream_id); - cm_data - .vardiff - .retain(|key, _| key.downstream_id != downstream_id); - }); + self.downstreams.remove(&downstream_id); + self.vardiff + .retain(|key, _| key.downstream_id != downstream_id); self.channel_manager_io .downstream_sender - .super_safe_lock(|map| map.remove(&downstream_id)); + .remove(&downstream_id); } // Handles messages received from the TP subsystem. @@ -627,38 +628,43 @@ impl ChannelManager { // upstream if applicable. async fn run_vardiff(&self) -> PoolResult<(), error::ChannelManager> { let mut messages: Vec = vec![]; - self.channel_manager_data - .super_safe_lock(|channel_manager_data| { - for (vardiff_key, vardiff_state) in channel_manager_data.vardiff.iter_mut() { - let downstream_id = &vardiff_key.downstream_id; - let channel_id = &vardiff_key.channel_id; - - let Some(downstream) = channel_manager_data.downstream.get_mut(downstream_id) - else { - continue; - }; - downstream.downstream_data.super_safe_lock(|data| { - if let Some(standard_channel) = data.standard_channels.get_mut(channel_id) { - Self::run_vardiff_on_standard_channel( - *downstream_id, - *channel_id, - standard_channel, - vardiff_state, - &mut messages, - ); - } - if let Some(extended_channel) = data.extended_channels.get_mut(channel_id) { - Self::run_vardiff_on_extended_channel( - *downstream_id, - *channel_id, - extended_channel, - vardiff_state, - &mut messages, - ); - } + for vardiff_key in self.vardiff.keys() { + let downstream_id = vardiff_key.downstream_id; + let channel_id = vardiff_key.channel_id; + if self + .downstreams + .with(&downstream_id, |downstream| { + self.vardiff.with_mut(&vardiff_key, |vardiff_state| { + downstream + .standard_channels + .with_mut(&channel_id, |standard_channel| { + Self::run_vardiff_on_standard_channel( + downstream_id, + channel_id, + standard_channel, + vardiff_state, + &mut messages, + ); + }); + downstream + .extended_channels + .with_mut(&channel_id, |extended_channel| { + Self::run_vardiff_on_extended_channel( + downstream_id, + channel_id, + extended_channel, + vardiff_state, + &mut messages, + ); + }); }); - } - }); + }) + .is_none() + { + self.vardiff.remove(&vardiff_key); + continue; + } + } for message in messages { // A send can only fail if the receiver side of the channel is closed. @@ -699,6 +705,38 @@ impl ChannelManager { Ok(()) } + + /// Runs `f` while holding the downstream map entry guard. + /// + /// Use this when mutations must only happen if the downstream is still + /// registered in the ChannelManager. Keep `f` short: do not perform blocking + /// work, send/forward messages, or re-enter `self.downstreams` inside it. + /// + /// Returns the closure result if the downstream is registered. Returns + /// `DownstreamNotFound` with a disconnect action if the downstream is no + /// longer registered. + #[allow(clippy::result_large_err)] + fn with_registered_downstream( + &self, + downstream_id: DownstreamId, + f: F, + ) -> PoolResult + where + F: FnOnce(&Downstream) -> PoolResult, + { + match self + .downstreams + .with(&downstream_id, |downstream| f(downstream)) + { + Some(result) => result, + None => Err({ + PoolError::disconnect( + PoolErrorKind::DownstreamNotFound(downstream_id), + downstream_id, + ) + }), + } + } } #[derive(Debug, Clone)] @@ -727,7 +765,7 @@ impl RouteMessageTo<'_> { RouteMessageTo::Downstream((downstream_id, message)) => { let sender = channel_manager_io .downstream_sender - .super_safe_lock(|map| map.get(&downstream_id).cloned()); + .get_cloned(&downstream_id); if let Some(sender) = sender { sender.send((message.into_static(), None)).await?; diff --git a/pool-apps/pool/src/lib/channel_manager/template_distribution_message_handler.rs b/pool-apps/pool/src/lib/channel_manager/template_distribution_message_handler.rs index cb8141616..e551ba28e 100644 --- a/pool-apps/pool/src/lib/channel_manager/template_distribution_message_handler.rs +++ b/pool-apps/pool/src/lib/channel_manager/template_distribution_message_handler.rs @@ -34,103 +34,137 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { ) -> Result<(), Self::Error> { info!("Received: {}", msg); - let messages = self.channel_manager_data.super_safe_lock(|channel_manager_data| { - if msg.future_template { - channel_manager_data.last_future_template = Some(msg.clone().into_static()); - } + if msg.future_template { + self.last_future_template + .set(Some(msg.clone().into_static())) + .map_err(PoolError::shutdown)?; + } - let mut messages: Vec = Vec::new(); - let mut coinbase_output = deserialize_outputs(channel_manager_data.coinbase_outputs.clone()).expect("deserialization failed"); - coinbase_output[0].value = Amount::from_sat(msg.coinbase_tx_value_remaining); + let mut messages: Vec = Vec::new(); + let mut coinbase_output = + deserialize_outputs(self.coinbase_outputs.clone()).expect("deserialization failed"); + coinbase_output[0].value = Amount::from_sat(msg.coinbase_tx_value_remaining); - for (downstream_id, downstream) in channel_manager_data.downstream.iter_mut() { - // If REQUIRES_CUSTOM_WORK is set, skip template handling entirely (see https://github.com/stratum-mining/sv2-apps/issues/55) - let requires_custom_work = downstream.requires_custom_work.load(Ordering::SeqCst); - if requires_custom_work { - continue; - } + self.downstreams.try_for_each(|downstream_id, downstream| { + // If REQUIRES_CUSTOM_WORK is set, skip template handling entirely + // (see https://github.com/stratum-mining/sv2-apps/issues/55). + if downstream.requires_custom_work.load(Ordering::SeqCst) { + return Ok(()); + } - let messages_: Vec> = downstream.downstream_data.super_safe_lock(|data| { - let downstream_coinbase_outputs = if let Some(ref payout_mode) = data.payout_mode { - payout_mode.coinbase_outputs(msg.coinbase_tx_value_remaining, &self.coinbase_reward_script) + let downstream_coinbase_outputs = downstream + .payout_mode + .with(|payout_mode| match payout_mode.as_ref() { + Some(mode) => mode.coinbase_outputs( + msg.coinbase_tx_value_remaining, + &self.coinbase_reward_script, + ), + None => coinbase_output.clone(), + }) + .map_err(PoolError::shutdown)?; + + let requires_standard_jobs = downstream.requires_standard_jobs.load(Ordering::SeqCst); + let mut downstream_messages = Vec::new(); + + let group_channel_job = downstream + .group_channel + .with(|group_channel| { + group_channel + .on_new_template( + msg.clone().into_static(), + downstream_coinbase_outputs.clone(), + ) + .map_err(PoolError::shutdown)?; + let group_job = if msg.future_template { + let Some(future_job_id) = + group_channel.get_future_job_id_from_template_id(msg.template_id) + else { + return Err(PoolError::shutdown(PoolErrorKind::JobNotFound)); + }; + let Some(future_job) = group_channel.get_future_job(future_job_id) else { + return Err(PoolError::shutdown(PoolErrorKind::JobNotFound)); + }; + future_job } else { - coinbase_output.clone() - }; - - data.group_channel.on_new_template(msg.clone().into_static(), downstream_coinbase_outputs.clone()).map_err(|e| { - tracing::error!("Error while adding template to group channel"); - PoolError::shutdown(e) - })?; - - let group_channel_job = match msg.future_template { - true => { - let future_job_id = data.group_channel.get_future_job_id_from_template_id(msg.template_id).ok_or( - PoolError::shutdown(PoolErrorKind::JobNotFound) - )?; - data.group_channel.get_future_job(future_job_id).ok_or( - PoolError::shutdown(PoolErrorKind::JobNotFound) - )? - }, - false => { - data.group_channel.get_active_job().ok_or( - PoolError::shutdown(PoolErrorKind::JobNotFound) - )? - }, + let Some(active_job) = group_channel.get_active_job() else { + return Err(PoolError::shutdown(PoolErrorKind::JobNotFound)); + }; + active_job }; - - let mut messages: Vec = vec![]; - - // if REQUIRES_STANDARD_JOBS is not set and the group channel is not empty - // we need to send the NewExtendedMiningJob message to the group channel - let requires_standard_jobs = downstream.requires_standard_jobs.load(Ordering::SeqCst); - let empty_group_channel = data.group_channel.is_empty(); - if !requires_standard_jobs && !empty_group_channel { - messages.push((*downstream_id, Mining::NewExtendedMiningJob(group_channel_job.get_job_message().clone())).into()); - } - - // loop over every standard channel - // if REQUIRES_STANDARD_JOBS is not set, we need to call on_group_channel_job on each standard channel - // if REQUIRES_STANDARD_JOBS is set, we need to call on_new_template, and send individual NewMiningJob messages for each standard channel - for (channel_id, standard_channel) in data.standard_channels.iter_mut() { - if !requires_standard_jobs { - standard_channel.on_group_channel_job(group_channel_job.clone()).map_err(|e| { - tracing::error!("Error while adding group channel job to standard channel with id: {channel_id:?}"); - PoolError::shutdown(e) - })?; - } else { - standard_channel.on_new_template(msg.clone().into_static(), downstream_coinbase_outputs.clone()).map_err(|e| { - tracing::error!("Error while adding template to standard channel"); - PoolError::shutdown(e) - })?; - - match msg.future_template { - true => { - let standard_job_id = standard_channel.get_future_job_id_from_template_id(msg.template_id).expect("future job id must exist"); - let standard_job = standard_channel.get_future_job(standard_job_id).expect("future job must exist"); - messages.push((*downstream_id, Mining::NewMiningJob(standard_job.get_job_message().clone())).into()); - }, - false => { - let standard_job = standard_channel.get_active_job().expect("active job must exist"); - messages.push((*downstream_id, Mining::NewMiningJob(standard_job.get_job_message().clone())).into()); - }, - } - } + // If REQUIRES_STANDARD_JOBS is not set and the group channel is not + // empty we need to send the NewExtendedMiningJob message to the group + // channel. + if !requires_standard_jobs && !group_channel.is_empty() { + downstream_messages.push( + ( + downstream_id, + Mining::NewExtendedMiningJob(group_job.get_job_message().clone()), + ) + .into(), + ); } - - // loop over every extended channel, and call on_group_channel_job on each extended channel - for (channel_id, extended_channel) in data.extended_channels.iter_mut() { - extended_channel.on_group_channel_job(group_channel_job.clone()).map_err(|e| { - tracing::error!("Error while adding group channel job to extended channel with id: {channel_id:?}"); + Ok::<_, Self::Error>(group_job.clone()) + }) + .map_err(PoolError::shutdown)??; + + // Loop over every standard channel. + // If REQUIRES_STANDARD_JOBS is not set, we need to call + // on_group_channel_job on each standard channel. + // If REQUIRES_STANDARD_JOBS is set, we need to call on_new_template and send + // individual NewMiningJob messages for each standard channel. + downstream.standard_channels.try_for_each_mut(|channel_id, standard_channel| { + if !requires_standard_jobs { + standard_channel + .on_group_channel_job(group_channel_job.clone()) + .map_err(|e| { + tracing::error!("Error while adding group channel job to standard channel with id: {channel_id:?}"); PoolError::shutdown(e) })?; - } - - Ok::>, Self::Error>(messages) - })?; + } else { + standard_channel + .on_new_template( + msg.clone().into_static(), + downstream_coinbase_outputs.clone(), + ) + .map_err(|e| { + tracing::error!("Error while adding template to standard channel"); + PoolError::shutdown(e) + })?; + let standard_job = if msg.future_template { + let job_id = standard_channel + .get_future_job_id_from_template_id(msg.template_id) + .expect("future job id must exist"); + standard_channel + .get_future_job(job_id) + .expect("future job must exist") + } else { + standard_channel + .get_active_job() + .expect("active job must exist") + }; + downstream_messages.push( + ( + downstream_id, + Mining::NewMiningJob(standard_job.get_job_message().clone()), + ) + .into(), + ); + } + Ok::<(), Self::Error>(()) + })?; + + // Loop over every extended channel and call on_group_channel_job on each one. + downstream.extended_channels.try_for_each_mut(|channel_id, channel| { + channel + .on_group_channel_job(group_channel_job.clone()) + .map_err(|e| { + tracing::error!("Error while adding group channel job to extended channel with id: {channel_id:?}"); + PoolError::shutdown(e) + }) + })?; - messages.extend(messages_); - } - Ok::>, Self::Error>(messages) + messages.extend(downstream_messages); + Ok(()) })?; for message in messages { @@ -173,86 +207,107 @@ impl HandleTemplateDistributionMessagesFromServerAsync for ChannelManager { ) -> Result<(), Self::Error> { info!("Received: {}", msg); - let messages = self.channel_manager_data.super_safe_lock(|data| { - data.last_new_prev_hash = Some(msg.clone().into_static()); - - let mut messages: Vec = vec![]; - - for (downstream_id, downstream) in data.downstream.iter_mut() { - // If downstream requires custom work, skip template handling entirely (see https://github.com/stratum-mining/sv2-apps/issues/55) - let requires_custom_work = downstream.requires_custom_work.load(Ordering::SeqCst); - if requires_custom_work { - continue; - } + self.last_new_prev_hash + .set(Some(msg.clone().into_static())) + .map_err(PoolError::shutdown)?; - let downstream_messages = downstream.downstream_data.super_safe_lock(|data| { - let mut messages: Vec = vec![]; - - // call on_set_new_prev_hash on the group channel to update the channel state - data.group_channel.on_set_new_prev_hash(msg.clone().into_static()).map_err(|e| { - tracing::error!("Error while adding new prev hash to group channel"); - PoolError::shutdown(e) - })?; - - // did SetupConnection have the REQUIRES_STANDARD_JOBS flag set? - // if no, and the group channel is not empty, we need to send the SetNewPrevHashMp to the group channel - let requires_standard_jobs = downstream.requires_standard_jobs.load(Ordering::SeqCst); - let empty_group_channel = data.group_channel.is_empty(); - if !requires_standard_jobs && !empty_group_channel { - let group_channel_id = data.group_channel.get_group_channel_id(); - let activated_group_job_id = data.group_channel.get_active_job().expect("active job must exist").get_job_id(); - let group_set_new_prev_hash_message = SetNewPrevHashMp { - channel_id: group_channel_id, - job_id: activated_group_job_id, - prev_hash: msg.prev_hash.clone(), - min_ntime: msg.header_timestamp, - nbits: msg.n_bits, - }; - - // send the SetNewPrevHash message to the group channel - messages.push((*downstream_id, Mining::SetNewPrevHash(group_set_new_prev_hash_message)).into()); - } + let mut messages: Vec = vec![]; + self.downstreams.try_for_each(|downstream_id, downstream| { + // If downstream requires custom work, skip template handling entirely + // (see https://github.com/stratum-mining/sv2-apps/issues/55). + if downstream.requires_custom_work.load(Ordering::SeqCst) { + return Ok(()); + } - // loop over every extended channel, and call on_set_new_prev_hash on each extended channel to update the channel state - for (channel_id, extended_channel) in data.extended_channels.iter_mut() { - extended_channel.on_set_new_prev_hash(msg.clone().into_static()).map_err(|e| { - tracing::error!("Error while adding new prev hash to extended channel: {channel_id:?} {e:?}"); + let requires_standard_jobs = downstream.requires_standard_jobs.load(Ordering::SeqCst); + let mut downstream_messages = Vec::new(); + + downstream + .group_channel + .with(|group_channel| { + // Call on_set_new_prev_hash on the group channel to update the channel + // state. + group_channel + .on_set_new_prev_hash(msg.clone().into_static()) + .map_err(|e| { + tracing::error!("Error while adding new prev hash to group channel"); PoolError::shutdown(e) })?; + // Did SetupConnection have the REQUIRES_STANDARD_JOBS flag set? + // If not, and the group channel is not empty, we need to send the + // SetNewPrevHash message to the group channel. + if !requires_standard_jobs && !group_channel.is_empty() { + let active_job_id = group_channel + .get_active_job() + .expect("active job must exist") + .get_job_id(); + downstream_messages.push( + ( + downstream_id, + Mining::SetNewPrevHash(SetNewPrevHashMp { + channel_id: group_channel.get_group_channel_id(), + job_id: active_job_id, + prev_hash: msg.prev_hash.clone(), + min_ntime: msg.header_timestamp, + nbits: msg.n_bits, + }), + ) + .into(), + ); } - - // loop over every standard channel, and call on_set_new_prev_hash on each standard channel to update the channel state - for (channel_id, standard_channel) in data.standard_channels.iter_mut() { - // call on_set_new_prev_hash on the standard channel to update the channel state - standard_channel.on_set_new_prev_hash(msg.clone().into_static()).map_err(|e| { - tracing::error!("Error while adding new prev hash to standard channel: {channel_id:?} {e:?}"); - PoolError::shutdown(e) - })?; - - // did SetupConnection have the REQUIRES_STANDARD_JOBS flag set? - // if yes, we need to send the SetNewPrevHashMp to each standard channel - if downstream.requires_standard_jobs.load(Ordering::SeqCst) { - let activated_standard_job_id = standard_channel.get_active_job().ok_or( - PoolError::shutdown(PoolErrorKind::JobNotFound) - )?.get_job_id(); - let standard_set_new_prev_hash_message = SetNewPrevHashMp { - channel_id: *channel_id, - job_id: activated_standard_job_id, + Ok::<(), Self::Error>(()) + }) + .map_err(PoolError::shutdown)??; + + // Loop over every extended channel and call on_set_new_prev_hash on each + // extended channel to update the channel state. + downstream.extended_channels.try_for_each_mut(|channel_id, channel| { + channel + .on_set_new_prev_hash(msg.clone().into_static()) + .map_err(|e| { + tracing::error!("Error while adding new prev hash to extended channel: {channel_id:?} {e:?}"); + PoolError::shutdown(e) + }) + })?; + + // Loop over every standard channel and call on_set_new_prev_hash on each + // standard channel to update the channel state. + downstream.standard_channels.try_for_each_mut(|channel_id, channel| { + // Call on_set_new_prev_hash on the standard channel to update the channel + // state. + channel + .on_set_new_prev_hash(msg.clone().into_static()) + .map_err(|e| { + tracing::error!("Error while adding new prev hash to standard channel: {channel_id:?} {e:?}"); + PoolError::shutdown(e) + })?; + // Did SetupConnection have the REQUIRES_STANDARD_JOBS flag set? + // If yes, we need to send the SetNewPrevHash message to each standard + // channel. + if requires_standard_jobs { + let Some(active_job) = channel.get_active_job() else { + return Err(PoolError::shutdown(PoolErrorKind::JobNotFound)); + }; + let active_job_id = active_job.get_job_id(); + downstream_messages.push( + ( + downstream_id, + Mining::SetNewPrevHash(SetNewPrevHashMp { + channel_id, + job_id: active_job_id, prev_hash: msg.prev_hash.clone(), min_ntime: msg.header_timestamp, nbits: msg.n_bits, - }; - messages.push((*downstream_id, Mining::SetNewPrevHash(standard_set_new_prev_hash_message)).into()); - } - } - - Ok::>, Self::Error>(messages) - })?; - - messages.extend(downstream_messages); - } + }), + ) + .into(), + ); + } + Ok::<(), Self::Error>(()) + })?; - Ok::>, Self::Error>(messages) + messages.extend(downstream_messages); + Ok(()) })?; for message in messages { diff --git a/pool-apps/pool/src/lib/downstream/common_message_handler.rs b/pool-apps/pool/src/lib/downstream/common_message_handler.rs index 6660e44b7..9cb18c935 100644 --- a/pool-apps/pool/src/lib/downstream/common_message_handler.rs +++ b/pool-apps/pool/src/lib/downstream/common_message_handler.rs @@ -25,9 +25,9 @@ impl HandleCommonMessagesFromClientAsync for Downstream { &self, _client_id: Option, ) -> Result, Self::Error> { - Ok(self - .downstream_data - .super_safe_lock(|data| data.negotiated_extensions.clone())) + self.negotiated_extensions + .get() + .map_err(PoolError::shutdown) } async fn handle_setup_connection( diff --git a/pool-apps/pool/src/lib/downstream/extensions_message_handler.rs b/pool-apps/pool/src/lib/downstream/extensions_message_handler.rs index d8f37a906..731d00f42 100644 --- a/pool-apps/pool/src/lib/downstream/extensions_message_handler.rs +++ b/pool-apps/pool/src/lib/downstream/extensions_message_handler.rs @@ -22,9 +22,9 @@ impl HandleExtensionsFromClientAsync for Downstream { &self, _client_id: Option, ) -> Result, Self::Error> { - Ok(self - .downstream_data - .super_safe_lock(|data| data.negotiated_extensions.clone())) + self.negotiated_extensions + .get() + .map_err(PoolError::shutdown) } async fn handle_request_extensions( @@ -114,9 +114,9 @@ impl HandleExtensionsFromClientAsync for Downstream { ); // Store the negotiated extensions in the shared downstream data - self.downstream_data.super_safe_lock(|data| { - data.negotiated_extensions = supported.clone(); - }); + self.negotiated_extensions + .set(supported.clone()) + .map_err(PoolError::shutdown)?; let success = RequestExtensionsSuccess { request_id: msg.request_id, diff --git a/pool-apps/pool/src/lib/downstream/mod.rs b/pool-apps/pool/src/lib/downstream/mod.rs index 07bf35e15..f3ff0864a 100644 --- a/pool-apps/pool/src/lib/downstream/mod.rs +++ b/pool-apps/pool/src/lib/downstream/mod.rs @@ -1,16 +1,12 @@ -use std::{ - collections::HashMap, - sync::{ - atomic::{AtomicBool, AtomicU32}, - Arc, - }, +use std::sync::{ + atomic::{AtomicBool, AtomicU32}, + Arc, }; use async_channel::{unbounded, Receiver, Sender}; use bitcoin_core_sv2::template_distribution_protocol::CancellationToken; use stratum_apps::{ channel_utils::ReceiverCleanup, - custom_mutex::Mutex, network_helpers::noise_stream::NoiseTcpStream, stratum_core::{ channels_sv2::server::{ @@ -21,6 +17,7 @@ use stratum_apps::{ handlers_sv2::{HandleCommonMessagesFromClientAsync, HandleExtensionsFromClientAsync}, parsers_sv2::{parse_message_frame_with_tlvs, AnyMessage, Mining, Tlv}, }, + sync::{SharedLock, SharedMap}, task_manager::TaskManager, utils::{ protocol_message_type::{protocol_message_type, MessageType}, @@ -38,25 +35,6 @@ use crate::{ mod common_message_handler; mod extensions_message_handler; -/// Holds state related to a downstream connection's mining channels. -/// -/// This includes: -/// - Whether the downstream requires a standard job (`require_std_job`). -/// - A [`GroupChannel`]. -/// - Active [`ExtendedChannel`]s keyed by channel ID. -/// - Active [`StandardChannel`]s keyed by channel ID. -/// - Extensions that have been successfully negotiated with this client -pub struct DownstreamData { - pub group_channel: GroupChannel<'static>, - pub extended_channels: HashMap>, - pub standard_channels: HashMap>, - pub channel_id_factory: AtomicU32, - /// Extensions that have been successfully negotiated with this client - pub negotiated_extensions: Vec, - /// Payout mode derived from user_identity (None until channel is opened) - pub payout_mode: Option, -} - /// Communication layer for a downstream connection. /// /// Provides the messaging primitives for interacting with the @@ -84,7 +62,14 @@ impl DownstreamIo { /// Represents a downstream client connected to this node. #[derive(Clone)] pub struct Downstream { - pub downstream_data: Arc>, + pub group_channel: SharedLock>, + pub extended_channels: SharedMap>, + pub standard_channels: SharedMap>, + pub channel_id_factory: Arc, + /// Extensions that have been successfully negotiated with this client + pub negotiated_extensions: SharedLock>, + /// Payout mode derived from user_identity (None until channel is opened) + pub payout_mode: SharedLock>, downstream_io: DownstreamIo, pub downstream_id: usize, pub requires_standard_jobs: Arc, @@ -181,18 +166,14 @@ impl Downstream { downstream_receiver: inbound_rx, }; - let downstream_data = Arc::new(Mutex::new(DownstreamData { - extended_channels: HashMap::new(), - standard_channels: HashMap::new(), - group_channel, - channel_id_factory, - negotiated_extensions: vec![], - payout_mode: None, - })); - Downstream { downstream_io, - downstream_data, + group_channel: SharedLock::new(group_channel), + extended_channels: SharedMap::new(), + standard_channels: SharedMap::new(), + channel_id_factory: Arc::new(channel_id_factory), + negotiated_extensions: SharedLock::new(Vec::new()), + payout_mode: SharedLock::new(None), downstream_id, requires_standard_jobs: Arc::new(AtomicBool::new(false)), requires_custom_work: Arc::new(AtomicBool::new(false)), @@ -284,10 +265,13 @@ impl Downstream { .recv() .await .map_err(|error| PoolError::disconnect(error, self.downstream_id))?; - let header = frame.get_header().ok_or_else(|| { + let Some(header) = frame.get_header() else { error!("SV2 frame missing header"); - PoolError::disconnect(framing_sv2::Error::MissingHeader, self.downstream_id) - })?; + return Err(PoolError::disconnect( + framing_sv2::Error::MissingHeader, + self.downstream_id, + )); + }; // The first ever message received on a new downstream connection // should always be a setup connection message. if header.msg_type() == MESSAGE_TYPE_SETUP_CONNECTION { @@ -347,17 +331,21 @@ impl Downstream { .recv() .await .map_err(|error| PoolError::disconnect(error, self.downstream_id))?; - let header = sv2_frame.get_header().ok_or_else(|| { + let Some(header) = sv2_frame.get_header() else { error!("SV2 frame missing header"); - PoolError::disconnect(framing_sv2::Error::MissingHeader, self.downstream_id) - })?; + return Err(PoolError::disconnect( + framing_sv2::Error::MissingHeader, + self.downstream_id, + )); + }; match protocol_message_type(header.ext_type(), header.msg_type()) { MessageType::Mining => { debug!("Received mining SV2 frame from downstream."); let negotiated_extensions = self - .downstream_data - .super_safe_lock(|data| data.negotiated_extensions.clone()); + .negotiated_extensions + .get() + .map_err(PoolError::shutdown)?; let (any_message, tlv_fields) = parse_message_frame_with_tlvs( header, sv2_frame.payload(), diff --git a/pool-apps/pool/src/lib/monitoring.rs b/pool-apps/pool/src/lib/monitoring.rs index 2dc131487..e9cd6b3a4 100644 --- a/pool-apps/pool/src/lib/monitoring.rs +++ b/pool-apps/pool/src/lib/monitoring.rs @@ -10,93 +10,94 @@ use stratum_apps::monitoring::client::{ use crate::{channel_manager::ChannelManager, downstream::Downstream}; /// Helper to convert a Downstream to Sv2ClientInfo. -/// Returns None if the lock cannot be acquired (graceful degradation for monitoring). fn downstream_to_sv2_client_info(client: &Downstream) -> Option { - client - .downstream_data - .safe_lock(|dd| { - let mut extended_channels = Vec::new(); - let mut standard_channels = Vec::new(); + let mut extended_channels = Vec::new(); + let mut standard_channels = Vec::new(); - for (_channel_id, extended_channel) in dd.extended_channels.iter() { - let channel_id = extended_channel.get_channel_id(); - let target = extended_channel.get_target(); - let requested_max_target = extended_channel.get_requested_max_target(); - let user_identity = extended_channel.get_user_identity(); - let share_accounting = extended_channel.get_share_accounting(); + client + .extended_channels + .for_each(|_channel_id, extended_channel| { + let channel_id = extended_channel.get_channel_id(); + let target = extended_channel.get_target(); + let requested_max_target = extended_channel.get_requested_max_target(); + let user_identity = extended_channel.get_user_identity(); + let share_accounting = extended_channel.get_share_accounting(); - extended_channels.push(ExtendedChannelInfo { - channel_id, - user_identity: user_identity.to_string(), - nominal_hashrate: extended_channel.get_nominal_hashrate(), - stable_hashrate: extended_channel.get_stable_hashrate(), - target_hex: hex::encode(target.to_be_bytes()), - requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()), - extranonce_prefix_hex: hex::encode(extended_channel.get_extranonce_prefix()), - full_extranonce_size: extended_channel.get_full_extranonce_size(), - rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(), - expected_shares_per_minute: extended_channel.get_shares_per_minute(), - shares_accepted: share_accounting.get_shares_accepted(), - shares_rejected: share_accounting.get_rejected_shares_count(), - shares_rejected_by_reason: share_accounting - .get_rejected_shares() - .map(|(reason, count)| (reason.to_string(), count)) - .collect(), - share_work_sum: share_accounting.get_share_work_sum(), - last_share_sequence_number: share_accounting.get_last_share_sequence_number(), - best_diff: share_accounting.get_best_diff(), - last_batch_accepted: share_accounting.get_last_batch_accepted(), - last_batch_work_sum: share_accounting.get_last_batch_work_sum(), - share_batch_size: share_accounting.get_share_batch_size(), - blocks_found: share_accounting.get_blocks_found(), - }); - } + extended_channels.push(ExtendedChannelInfo { + channel_id, + user_identity: user_identity.to_string(), + nominal_hashrate: extended_channel.get_nominal_hashrate(), + stable_hashrate: extended_channel.get_stable_hashrate(), + target_hex: hex::encode(target.to_be_bytes()), + requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()), + extranonce_prefix_hex: hex::encode(extended_channel.get_extranonce_prefix()), + full_extranonce_size: extended_channel.get_full_extranonce_size(), + rollable_extranonce_size: extended_channel.get_rollable_extranonce_size(), + expected_shares_per_minute: extended_channel.get_shares_per_minute(), + shares_accepted: share_accounting.get_shares_accepted(), + shares_rejected: share_accounting.get_rejected_shares_count(), + shares_rejected_by_reason: share_accounting + .get_rejected_shares() + .map(|(reason, count)| (reason.to_string(), count)) + .collect(), + share_work_sum: share_accounting.get_share_work_sum(), + last_share_sequence_number: share_accounting.get_last_share_sequence_number(), + best_diff: share_accounting.get_best_diff(), + last_batch_accepted: share_accounting.get_last_batch_accepted(), + last_batch_work_sum: share_accounting.get_last_batch_work_sum(), + share_batch_size: share_accounting.get_share_batch_size(), + blocks_found: share_accounting.get_blocks_found(), + }); + }); - for (_channel_id, standard_channel) in dd.standard_channels.iter() { - let channel_id = standard_channel.get_channel_id(); - let target = standard_channel.get_target(); - let requested_max_target = standard_channel.get_requested_max_target(); - let user_identity = standard_channel.get_user_identity(); - let share_accounting = standard_channel.get_share_accounting(); + client + .standard_channels + .for_each(|_channel_id, standard_channel| { + let channel_id = standard_channel.get_channel_id(); + let target = standard_channel.get_target(); + let requested_max_target = standard_channel.get_requested_max_target(); + let user_identity = standard_channel.get_user_identity(); + let share_accounting = standard_channel.get_share_accounting(); - standard_channels.push(StandardChannelInfo { - channel_id, - user_identity: user_identity.to_string(), - nominal_hashrate: standard_channel.get_nominal_hashrate(), - stable_hashrate: standard_channel.get_stable_hashrate(), - target_hex: hex::encode(target.to_be_bytes()), - requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()), - extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()), - expected_shares_per_minute: standard_channel.get_shares_per_minute(), - shares_accepted: share_accounting.get_shares_accepted(), - shares_rejected: share_accounting.get_rejected_shares_count(), - shares_rejected_by_reason: share_accounting - .get_rejected_shares() - .map(|(reason, count)| (reason.to_string(), count)) - .collect(), - share_work_sum: share_accounting.get_share_work_sum(), - last_share_sequence_number: share_accounting.get_last_share_sequence_number(), - best_diff: share_accounting.get_best_diff(), - last_batch_accepted: share_accounting.get_last_batch_accepted(), - last_batch_work_sum: share_accounting.get_last_batch_work_sum(), - share_batch_size: share_accounting.get_share_batch_size(), - blocks_found: share_accounting.get_blocks_found(), - }); - } + standard_channels.push(StandardChannelInfo { + channel_id, + user_identity: user_identity.to_string(), + nominal_hashrate: standard_channel.get_nominal_hashrate(), + stable_hashrate: standard_channel.get_stable_hashrate(), + target_hex: hex::encode(target.to_be_bytes()), + requested_max_target_hex: hex::encode(requested_max_target.to_be_bytes()), + extranonce_prefix_hex: hex::encode(standard_channel.get_extranonce_prefix()), + expected_shares_per_minute: standard_channel.get_shares_per_minute(), + shares_accepted: share_accounting.get_shares_accepted(), + shares_rejected: share_accounting.get_rejected_shares_count(), + shares_rejected_by_reason: share_accounting + .get_rejected_shares() + .map(|(reason, count)| (reason.to_string(), count)) + .collect(), + share_work_sum: share_accounting.get_share_work_sum(), + last_share_sequence_number: share_accounting.get_last_share_sequence_number(), + best_diff: share_accounting.get_best_diff(), + last_batch_accepted: share_accounting.get_last_batch_accepted(), + last_batch_work_sum: share_accounting.get_last_batch_work_sum(), + share_batch_size: share_accounting.get_share_batch_size(), + blocks_found: share_accounting.get_blocks_found(), + }); + }); - Sv2ClientInfo::new(client.downstream_id, extended_channels, standard_channels) - }) - .ok() + Some(Sv2ClientInfo::new( + client.downstream_id, + extended_channels, + standard_channels, + )) } impl Sv2ClientsMonitoring for ChannelManager { fn get_sv2_clients(&self) -> Vec { // Clone Downstream references and release lock immediately to avoid contention // with template distribution and message handling - let downstream_refs: Vec = self - .channel_manager_data - .safe_lock(|data| data.downstream.values().cloned().collect()) - .unwrap_or_default(); + let mut downstream_refs: Vec = Vec::new(); + self.downstreams + .for_each(|_, downstream| downstream_refs.push(downstream.clone())); downstream_refs .iter() @@ -105,12 +106,8 @@ impl Sv2ClientsMonitoring for ChannelManager { } fn get_sv2_client_by_id(&self, client_id: usize) -> Option { - self.channel_manager_data - .safe_lock(|d| { - d.downstream - .get(&client_id) - .and_then(downstream_to_sv2_client_info) - }) - .unwrap_or(None) + self.downstreams.with(&client_id, |downstream| { + downstream_to_sv2_client_info(downstream) + })? } } diff --git a/stratum-apps/src/sync.rs b/stratum-apps/src/sync.rs index b70a633cb..5d3528eb0 100644 --- a/stratum-apps/src/sync.rs +++ b/stratum-apps/src/sync.rs @@ -108,6 +108,7 @@ impl SharedRw { } /// Concurrent map wrapper over `DashMap` providing ergonomic scoped access. +#[derive(Debug)] pub struct SharedMap(Arc>); impl Clone for SharedMap { @@ -122,6 +123,19 @@ impl SharedMap { SharedMap(Arc::new(DashMap::new())) } + /// Get an owned clone of a value. + /// + /// This releases the map entry guard immediately, so the entry may be removed + /// or replaced while the caller is still using the clone. Use this only when + /// stale clones are acceptable. Prefer [`Self::with`] or [`Self::with_mut`] + /// when later work depends on the entry still being present. + pub fn get_cloned(&self, key: &K) -> Option + where + V: Clone, + { + self.0.get(key).map(|refc| refc.value().clone()) + } + /// Read a value for a key using a closure. /// /// Caution: `f` runs while the entry guard is held. Avoid re-entering this @@ -149,6 +163,25 @@ impl SharedMap { Some(result) } + /// Mutate an entry, inserting a value produced by `default` when absent. + pub fn with_mut_or_insert_with(&self, key: K, default: D, f: F) -> R + where + F: FnOnce(&mut V) -> R, + D: FnOnce() -> V, + { + let mut entry = self.0.entry(key).or_insert_with(default); + f(entry.value_mut()) + } + + /// Mutate an entry, inserting its default value when absent. + pub fn with_mut_or_default(&self, key: K, f: F) -> R + where + V: Default, + F: FnOnce(&mut V) -> R, + { + self.with_mut_or_insert_with(key, V::default, f) + } + /// Iterate over all entries immutably. /// /// Caution: `f` runs while an iterator entry guard is held. Avoid @@ -193,9 +226,9 @@ impl SharedMap { /// /// Caution: `f` runs while an iterator entry guard is held. Avoid /// re-entering this `SharedMap` from inside the closure. - pub fn try_for_each(&self, f: F) -> Result<(), E> + pub fn try_for_each(&self, mut f: F) -> Result<(), E> where - F: Fn(K, &V) -> Result<(), E>, + F: FnMut(K, &V) -> Result<(), E>, { for entry in self.0.iter() { f(entry.key().clone(), entry.value())?; @@ -246,6 +279,11 @@ impl SharedMap { pub fn is_empty(&self) -> bool { self.0.is_empty() } + + /// Clears the collection. + pub fn clear(&self) { + self.0.clear() + } } impl Default for SharedMap { @@ -294,9 +332,12 @@ mod tests { map.with_mut(&"a", |v| *v += 10); assert_eq!(map.with(&"a", |v| *v).unwrap(), 11); + map.with_mut_or_default("c", |v| *v += 3); + assert_eq!(map.get_cloned(&"c"), Some(3)); + let mut sum = 0; map.for_each(|_, v| sum += v); - assert_eq!(sum, 13); + assert_eq!(sum, 16); map.remove(&"a"); assert!(!map.contains_key(&"a")); diff --git a/stratum-apps/src/utils/types.rs b/stratum-apps/src/utils/types.rs index d57ea560b..ddcec4cb8 100644 --- a/stratum-apps/src/utils/types.rs +++ b/stratum-apps/src/utils/types.rs @@ -18,7 +18,7 @@ pub type JdToken = u64; pub type Message = AnyMessage<'static>; pub type Sv2Frame = StandardSv2Frame; -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct VardiffKey { pub downstream_id: DownstreamId, pub channel_id: ChannelId,