Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions shared/services/state/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (m *NetworkStateManager) GetHeadState() (*NetworkState, error) {
if err != nil {
return nil, fmt.Errorf("error getting latest Beacon slot: %w", err)
}
return m.createNetworkState(targetSlot)
return m.createNetworkState(targetSlot, nil)
}

// Get the state of the network for a single node using the latest Execution layer block, along with the total effective RPL stake for the network
Expand All @@ -73,12 +73,12 @@ func (m *NetworkStateManager) GetHeadStateForNode(nodeAddress common.Address) (*
if err != nil {
return nil, fmt.Errorf("error getting latest Beacon slot: %w", err)
}
return m.createNetworkStateForNode(targetSlot, nodeAddress)
return m.createNetworkState(targetSlot, []common.Address{nodeAddress})
}

// Get the state of the network at the provided Beacon slot
func (m *NetworkStateManager) GetStateForSlot(slotNumber uint64) (*NetworkState, error) {
return m.createNetworkState(slotNumber)
return m.createNetworkState(slotNumber, nil)
}

// Gets the latest valid block
Expand Down
243 changes: 50 additions & 193 deletions shared/services/state/network-state.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,12 @@ func (ns *NetworkState) UnmarshalJSON(data []byte) error {
return nil
}

// Creates a snapshot of the entire Rocket Pool network state, on both the Execution and Consensus layers
func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkState, error) {
// Creates a snapshot of the Rocket Pool network state, on both the Execution and Consensus layers.
// If nodeAddresses is nil, all nodes are queried. Otherwise, only the specified nodes are included.
func (m *NetworkStateManager) createNetworkState(slotNumber uint64, nodeAddresses []common.Address) (*NetworkState, error) {
allNodes := len(nodeAddresses) == 0
steps := 9
currentStep := 0

// Get the execution block for the given slot
beaconBlock, exists, err := m.bc.GetBeaconBlock(fmt.Sprintf("%d", slotNumber))
Expand Down Expand Up @@ -211,21 +215,46 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
if err != nil {
return nil, fmt.Errorf("error getting network details: %w", err)
}
m.logLine("1/7 - Retrieved network details (%s so far)", time.Since(start))
currentStep++
m.logLine("%d/%d - Retrieved network details (%s so far)", currentStep, steps, time.Since(start))

// Node details
state.NodeDetails, err = rpstate.GetAllNativeNodeDetails(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting all node details: %w", err)
if allNodes {
state.NodeDetails, err = rpstate.GetAllNativeNodeDetails(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting all node details: %w", err)
}
} else {
state.NodeDetails = make([]rpstate.NativeNodeDetails, 0, len(nodeAddresses))
for _, addr := range nodeAddresses {
nodeDetails, err := rpstate.GetNativeNodeDetails(m.rp, contracts, addr)
if err != nil {
return nil, fmt.Errorf("error getting node details for %s: %w", addr.Hex(), err)
}
state.NodeDetails = append(state.NodeDetails, nodeDetails)
}
}
m.logLine("2/7 - Retrieved node details (%s so far)", time.Since(start))
currentStep++
m.logLine("%d/%d - Retrieved node details (%s so far)", currentStep, steps, time.Since(start))

// Minipool details
state.MinipoolDetails, err = rpstate.GetAllNativeMinipoolDetails(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting all minipool details: %w", err)
if allNodes {
state.MinipoolDetails, err = rpstate.GetAllNativeMinipoolDetails(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting all minipool details: %w", err)
}
} else {
state.MinipoolDetails = []rpstate.NativeMinipoolDetails{}
for _, addr := range nodeAddresses {
nodeMinipools, err := rpstate.GetNodeNativeMinipoolDetails(m.rp, contracts, addr)
if err != nil {
return nil, fmt.Errorf("error getting minipool details for node %s: %w", addr.Hex(), err)
}
state.MinipoolDetails = append(state.MinipoolDetails, nodeMinipools...)
}
}
m.logLine("3/7 - Retrieved minipool details (%s so far)", time.Since(start))
currentStep++
m.logLine("%d/%d - Retrieved minipool details (%s so far)", currentStep, steps, time.Since(start))

// Create the node lookup
for i, details := range state.NodeDetails {
Expand Down Expand Up @@ -254,7 +283,8 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
if err != nil {
return nil, fmt.Errorf("error getting all megapool validator details: %w", err)
}
m.logLine("4/7 - Retrieved megapool validator global index (%s so far)", time.Since(start))
currentStep++
m.logLine("%d/%d - Retrieved megapool validator global index (%s so far)", currentStep, steps, time.Since(start))

megapoolValidatorPubkeys := make([]types.ValidatorPubkey, 0, len(state.MegapoolValidatorGlobalIndex))
megapoolAddressMap := make(map[common.Address][]types.ValidatorPubkey)
Expand Down Expand Up @@ -296,7 +326,8 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
if err := megapoolWg.Wait(); err != nil {
return nil, fmt.Errorf("error getting megapool details: %w", err)
}
m.logLine("4/7 - Retrieved megapool validator details (%s so far)", time.Since(start))
currentStep++
m.logLine("%d/%d - Retrieved megapool validator details (%s so far)", currentStep, steps, time.Since(start))

// Calculate avg node fees and distributor shares
for _, details := range state.NodeDetails {
Expand All @@ -308,133 +339,8 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
if err != nil {
return nil, fmt.Errorf("error getting Oracle DAO details: %w", err)
}
m.logLine("5/7 - Retrieved Oracle DAO details (%s so far)", time.Since(start))

// Get the validator stats from Beacon
statusMap, err := m.bc.GetValidatorStatuses(pubkeys, &beacon.ValidatorStatusOptions{
Slot: &slotNumber,
})
if err != nil {
return nil, err
}
state.MinipoolValidatorDetails = statusMap
m.logLine("6/7 - Retrieved validator details (total time: %s)", time.Since(start))

// Get the complete node and user shares
mpds := make([]*rpstate.NativeMinipoolDetails, len(state.MinipoolDetails))
beaconBalances := make([]*big.Int, len(state.MinipoolDetails))
for i, mpd := range state.MinipoolDetails {
mpds[i] = &state.MinipoolDetails[i]
validator := state.MinipoolValidatorDetails[mpd.Pubkey]
if !validator.Exists {
beaconBalances[i] = big.NewInt(0)
} else {
beaconBalances[i] = eth.GweiToWei(float64(validator.Balance))
}
}
err = rpstate.CalculateCompleteMinipoolShares(m.rp, contracts, mpds, beaconBalances)
if err != nil {
return nil, err
}
state.MinipoolValidatorDetails = statusMap
m.logLine("7/7 - Calculated complete node and user balance shares (total time: %s)", time.Since(start))

return state, nil
}

// Creates a snapshot of the Rocket Pool network, but only for a single node
func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeAddress common.Address) (*NetworkState, error) {
steps := 7

// Get the execution block for the given slot
beaconBlock, exists, err := m.bc.GetBeaconBlock(fmt.Sprintf("%d", slotNumber))
if err != nil {
return nil, fmt.Errorf("error getting Beacon block for slot %d: %w", slotNumber, err)
}
if !exists {
return nil, fmt.Errorf("slot %d did not have a Beacon block", slotNumber)
}

// Get the corresponding block on the EL
elBlockNumber := beaconBlock.ExecutionBlockNumber
opts := &bind.CallOpts{
BlockNumber: big.NewInt(0).SetUint64(elBlockNumber),
}

beaconConfig, err := m.getBeaconConfig()
if err != nil {
return nil, fmt.Errorf("error getting Beacon config: %w", err)
}

// Create the state wrapper
state := &NetworkState{
NodeDetailsByAddress: map[common.Address]*rpstate.NativeNodeDetails{},
MinipoolDetailsByAddress: map[common.Address]*rpstate.NativeMinipoolDetails{},
MinipoolDetailsByNode: map[common.Address][]*rpstate.NativeMinipoolDetails{},
BeaconSlotNumber: slotNumber,
ElBlockNumber: elBlockNumber,
BeaconConfig: *beaconConfig,
}

m.logLine("Getting network state for EL block %d, Beacon slot %d", elBlockNumber, slotNumber)
start := time.Now()

// Network contracts and details
contracts, err := rpstate.NewNetworkContracts(m.rp, m.multicaller, m.balanceBatcher, opts)
if err != nil {
return nil, fmt.Errorf("error getting network contracts: %w", err)
}
state.NetworkDetails, err = rpstate.NewNetworkDetails(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting network details: %w", err)
}
m.logLine("1/%d - Retrieved network details (%s so far)", steps, time.Since(start))

// Node details
nodeDetails, err := rpstate.GetNativeNodeDetails(m.rp, contracts, nodeAddress)
if err != nil {
return nil, fmt.Errorf("error getting node details: %w", err)
}
state.NodeDetails = []rpstate.NativeNodeDetails{nodeDetails}
m.logLine("2/%d - Retrieved node details (%s so far)", steps, time.Since(start))

// Minipool details
state.MinipoolDetails, err = rpstate.GetNodeNativeMinipoolDetails(m.rp, contracts, nodeAddress)
if err != nil {
return nil, fmt.Errorf("error getting all minipool details: %w", err)
}
m.logLine("3/%d - Retrieved minipool details (%s so far)", steps, time.Since(start))

// Create the node lookup
for i, details := range state.NodeDetails {
state.NodeDetailsByAddress[details.NodeAddress] = &state.NodeDetails[i]
}

// Create the minipool lookups
pubkeys := make([]types.ValidatorPubkey, 0, len(state.MinipoolDetails))
emptyPubkey := types.ValidatorPubkey{}
for i, details := range state.MinipoolDetails {
state.MinipoolDetailsByAddress[details.MinipoolAddress] = &state.MinipoolDetails[i]
if details.Pubkey != emptyPubkey {
pubkeys = append(pubkeys, details.Pubkey)
}

// The map of nodes to minipools
nodeList, exists := state.MinipoolDetailsByNode[details.NodeAddress]
if !exists {
nodeList = []*rpstate.NativeMinipoolDetails{}
}
nodeList = append(nodeList, &state.MinipoolDetails[i])
state.MinipoolDetailsByNode[details.NodeAddress] = nodeList
}

// Calculate avg node fees and distributor shares
for _, details := range state.NodeDetails {
details.CalculateAverageFeeAndDistributorShares(state.MinipoolDetailsByNode[details.NodeAddress])
}

// Get the total network effective RPL stake
currentStep := 4
currentStep++
m.logLine("%d/%d - Retrieved Oracle DAO details (%s so far)", currentStep, steps, time.Since(start))

// Get the validator stats from Beacon
statusMap, err := m.bc.GetValidatorStatuses(pubkeys, &beacon.ValidatorStatusOptions{
Expand All @@ -444,8 +350,8 @@ func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeA
return nil, err
}
state.MinipoolValidatorDetails = statusMap
m.logLine("%d/%d - Retrieved validator details (total time: %s)", currentStep, steps, time.Since(start))
currentStep++
m.logLine("%d/%d - Retrieved validator details (%s so far)", currentStep, steps, time.Since(start))

// Get the complete node and user shares
mpds := make([]*rpstate.NativeMinipoolDetails, len(state.MinipoolDetails))
Expand All @@ -464,65 +370,16 @@ func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeA
return nil, err
}
state.MinipoolValidatorDetails = statusMap
m.logLine("%d/%d - Calculated complete node and user balance shares (total time: %s)", currentStep, steps, time.Since(start))
currentStep++
m.logLine("%d/%d - Calculated complete node and user balance shares (%s so far)", currentStep, steps, time.Since(start))

// Get the protocol DAO proposals
// Protocol DAO proposals
state.ProtocolDaoProposalDetails, err = rpstate.GetAllProtocolDaoProposalDetails(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting Protocol DAO proposal details: %w", err)
}
m.logLine("%d/%d - Retrieved Protocol DAO proposals (total time: %s)", currentStep, steps, time.Since(start))
currentStep++

state.MegapoolValidatorGlobalIndex, err = rpstate.GetAllMegapoolValidators(m.rp, contracts)
if err != nil {
return nil, fmt.Errorf("error getting all megapool validator details: %w", err)
}

megapoolValidatorPubkeys := make([]types.ValidatorPubkey, 0, len(state.MegapoolValidatorGlobalIndex))
megapoolAddressMap := make(map[common.Address][]types.ValidatorPubkey)
megapoolValidatorInfo := make(map[types.ValidatorPubkey]*megapool.ValidatorInfoFromGlobalIndex)
for i := range state.MegapoolValidatorGlobalIndex {
validator := &state.MegapoolValidatorGlobalIndex[i]
if len(validator.Pubkey) > 0 {
pubkey := types.ValidatorPubkey(validator.Pubkey)
megapoolAddressMap[validator.MegapoolAddress] = append(megapoolAddressMap[validator.MegapoolAddress], pubkey)
megapoolValidatorPubkeys = append(megapoolValidatorPubkeys, pubkey)
megapoolValidatorInfo[pubkey] = validator
}
}
state.MegapoolToPubkeysMap = megapoolAddressMap
state.MegapoolValidatorInfo = megapoolValidatorInfo

megapoolAddresses := make([]common.Address, 0, len(megapoolAddressMap))
for addr := range megapoolAddressMap {
megapoolAddresses = append(megapoolAddresses, addr)
}

// Fetch beacon validator statuses and EL megapool details in parallel
var megapoolWg errgroup.Group
megapoolWg.Go(func() error {
statusMap, err := m.bc.GetValidatorStatuses(megapoolValidatorPubkeys, &beacon.ValidatorStatusOptions{
Slot: &slotNumber,
})
if err != nil {
return err
}
state.MegapoolValidatorDetails = statusMap
return nil
})
megapoolWg.Go(func() error {
var err error
state.MegapoolDetails, err = rpstate.GetBulkMegapoolDetails(m.rp, contracts, megapoolAddresses)
return err
})
if err := megapoolWg.Wait(); err != nil {
return nil, fmt.Errorf("error getting megapool details: %w", err)
}
m.logLine("%d/%d - Retrieved megapool validator details (total time: %s)", currentStep, steps, time.Since(start))

currentStep++
m.logLine("%d/%d - Retrieved Protocol DAO proposals (total time: %s)", currentStep, steps, time.Since(start))

return state, nil
}
Expand Down
Loading
Loading