diff --git a/block/components_test.go b/block/components_test.go index 3d1a1f4a1e..8deca85e8e 100644 --- a/block/components_test.go +++ b/block/components_test.go @@ -176,7 +176,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) { zerolog.Nop(), NopMetrics(), DefaultBlockOptions(), - nil, + nil, // raftNode ) require.NoError(t, err) @@ -260,7 +260,7 @@ func TestExecutor_RealExecutionClientFailure_StopsNode(t *testing.T) { zerolog.Nop(), NopMetrics(), DefaultBlockOptions(), - nil, + nil, // raftNode ) require.NoError(t, err) diff --git a/node/failover.go b/node/failover.go index 787f627ce6..493b28e184 100644 --- a/node/failover.go +++ b/node/failover.go @@ -14,13 +14,11 @@ import ( "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/p2p" - "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/raft" rpcserver "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/store" evsync "github.com/evstack/ev-node/pkg/sync" - ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" "golang.org/x/sync/errgroup" ) @@ -38,10 +36,7 @@ type failoverState struct { func newSyncMode( nodeConfig config.Config, - nodeKey *key.NodeKey, genesis genesispkg.Genesis, - rootDB ds.Batching, - daStore store.Store, exec coreexecutor.Executor, da block.DAClient, logger zerolog.Logger, @@ -49,6 +44,7 @@ func newSyncMode( blockMetrics *block.Metrics, nodeOpts NodeOptions, raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) { return block.NewSyncComponents( @@ -67,16 +63,13 @@ func newSyncMode( raftNode, ) } - return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode) + return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient) } func newAggregatorMode( nodeConfig config.Config, - nodeKey *key.NodeKey, signer signer.Signer, genesis genesispkg.Genesis, - rootDB ds.Batching, - daStore store.Store, exec coreexecutor.Executor, sequencer coresequencer.Sequencer, da block.DAClient, @@ -85,8 +78,8 @@ func newAggregatorMode( blockMetrics *block.Metrics, nodeOpts NodeOptions, raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { - blockComponentsFn := func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error) { return block.NewAggregatorComponents( nodeConfig, @@ -105,31 +98,24 @@ func newAggregatorMode( ) } - return setupFailoverState(nodeConfig, nodeKey, rootDB, daStore, genesis, logger, rktStore, blockComponentsFn, raftNode) + return setupFailoverState(nodeConfig, genesis, logger, rktStore, blockComponentsFn, raftNode, p2pClient) } func setupFailoverState( nodeConfig config.Config, - nodeKey *key.NodeKey, - rootDB ds.Batching, - daStore store.Store, genesis genesispkg.Genesis, logger zerolog.Logger, rktStore store.Store, buildComponentsFn func(headerSyncService *evsync.HeaderSyncService, dataSyncService *evsync.DataSyncService) (*block.Components, error), raftNode *raft.Node, + p2pClient *p2p.Client, ) (*failoverState, error) { - p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, rootDB, genesis.ChainID, logger, nil) - if err != nil { - return nil, err - } - - headerSyncService, err := evsync.NewHeaderSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger()) + headerSyncService, err := evsync.NewHeaderSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "HeaderSyncService").Logger()) if err != nil { return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err) } - dataSyncService, err := evsync.NewDataSyncService(daStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger()) + dataSyncService, err := evsync.NewDataSyncService(rktStore, nodeConfig, genesis, p2pClient, logger.With().Str("component", "DataSyncService").Logger()) if err != nil { return nil, fmt.Errorf("error while initializing DataSyncService: %w", err) } @@ -203,19 +189,24 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) { return nil }) - if err := f.p2pClient.Start(ctx); err != nil { - return fmt.Errorf("start p2p: %w", err) - } - defer f.p2pClient.Close() // nolint: errcheck - - if err := f.headerSyncService.Start(ctx); err != nil { - return fmt.Errorf("error while starting header sync service: %w", err) + // start header and data sync services concurrently to avoid cumulative startup delay. + syncWg, syncCtx := errgroup.WithContext(ctx) + syncWg.Go(func() error { + if err := f.headerSyncService.Start(syncCtx); err != nil { + return fmt.Errorf("header sync service: %w", err) + } + return nil + }) + syncWg.Go(func() error { + if err := f.dataSyncService.Start(syncCtx); err != nil { + return fmt.Errorf("data sync service: %w", err) + } + return nil + }) + if err := syncWg.Wait(); err != nil { + return err } defer stopService(f.headerSyncService.Stop, "header sync") - - if err := f.dataSyncService.Start(ctx); err != nil { - return fmt.Errorf("error while starting data sync service: %w", err) - } defer stopService(f.dataSyncService.Stop, "data sync") wg.Go(func() error { diff --git a/node/full.go b/node/full.go index 4fa2ff7c52..41106de365 100644 --- a/node/full.go +++ b/node/full.go @@ -22,7 +22,7 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" genesispkg "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" raftpkg "github.com/evstack/ev-node/pkg/raft" "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/signer" @@ -53,7 +53,8 @@ type FullNode struct { nodeConfig config.Config - daClient block.DAClient + daClient block.DAClient + p2pClient *p2p.Client Store store.Store raftNode *raftpkg.Node @@ -66,7 +67,7 @@ type FullNode struct { // newFullNode creates a new Rollkit full node. func newFullNode( nodeConfig config.Config, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, signer signer.Signer, genesis genesispkg.Genesis, database ds.Batching, @@ -106,13 +107,12 @@ func newFullNode( leaderFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting aggregator-MODE") nodeConfig.Node.Aggregator = true - nodeConfig.P2P.Peers = "" // peers are not supported in aggregator mode - return newAggregatorMode(nodeConfig, nodeKey, signer, genesis, database, evstore, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode) + return newAggregatorMode(nodeConfig, signer, genesis, exec, sequencer, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } followerFactory := func() (raftpkg.Runnable, error) { logger.Info().Msg("Starting sync-MODE") nodeConfig.Node.Aggregator = false - return newSyncMode(nodeConfig, nodeKey, genesis, database, evstore, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode) + return newSyncMode(nodeConfig, genesis, exec, daClient, logger, evstore, blockMetrics, nodeOpts, raftNode, p2pClient) } // Initialize raft node if enabled (for both aggregator and sync nodes) @@ -136,6 +136,7 @@ func newFullNode( genesis: genesis, nodeConfig: nodeConfig, daClient: daClient, + p2pClient: p2pClient, Store: evstore, leaderElection: leaderElection, raftNode: raftNode, @@ -279,6 +280,17 @@ func (n *FullNode) Run(parentCtx context.Context) error { (n.nodeConfig.Instrumentation.IsPrometheusEnabled() || n.nodeConfig.Instrumentation.IsPprofEnabled()) { n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer() } + + // Start the P2P client once. It persists across mode switches + if err := n.p2pClient.Start(ctx); err != nil { + return fmt.Errorf("start p2p: %w", err) + } + defer func() { + if err := n.p2pClient.Close(); err != nil { + n.Logger.Error().Err(err).Msg("error closing p2p client") + } + }() + // Start leader election if n.raftNode != nil { if err := n.raftNode.Start(ctx); err != nil { diff --git a/node/helpers_test.go b/node/helpers_test.go index 94989d2f7b..2127ef9aaf 100644 --- a/node/helpers_test.go +++ b/node/helpers_test.go @@ -16,12 +16,13 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/test/testda" "github.com/ipfs/go-datastore" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" "github.com/rs/zerolog" "github.com/stretchr/testify/require" evconfig "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" remote_signer "github.com/evstack/ev-node/pkg/signer/noop" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" @@ -67,22 +68,18 @@ func newDummyDAClient(maxBlobSize uint64) *testda.DummyDA { return getSharedDummyDA(maxBlobSize) } -func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, *key.NodeKey, datastore.Batching, func()) { +func createTestComponents(t *testing.T, config evconfig.Config) (coreexecutor.Executor, coresequencer.Sequencer, block.DAClient, crypto.PrivKey, datastore.Batching, func()) { executor := coreexecutor.NewDummyExecutor() sequencer := coresequencer.NewDummySequencer() daClient := newDummyDAClient(0) // Create genesis and keys for P2P client _, genesisValidatorKey, _ := types.GetGenesisWithPrivkey("test-chain") - p2pKey := &key.NodeKey{ - PrivKey: genesisValidatorKey, - PubKey: genesisValidatorKey.GetPublic(), - } ds, err := store.NewTestInMemoryKVStore() require.NoError(t, err) stop := daClient.StartHeightTicker(config.DA.BlockTime.Duration) - return executor, sequencer, daClient, p2pKey, ds, stop + return executor, sequencer, daClient, genesisValidatorKey, ds, stop } func getTestConfig(t *testing.T, n int) evconfig.Config { @@ -120,7 +117,7 @@ func newTestNode( executor coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, - nodeKey *key.NodeKey, + privKey crypto.PrivKey, ds datastore.Batching, stopDAHeightTicker func(), ) (*FullNode, func()) { @@ -133,13 +130,17 @@ func newTestNode( if testing.Verbose() { logger = zerolog.New(zerolog.NewTestWriter(t)) } + + p2pClient, err := newTestP2PClient(config, privKey, ds, genesis.ChainID, logger) + require.NoError(t, err) + node, err := NewNode( config, executor, sequencer, daClient, remoteSigner, - nodeKey, + p2pClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -159,8 +160,8 @@ func newTestNode( func createNodeWithCleanup(t *testing.T, config evconfig.Config) (*FullNode, func()) { resetSharedDummyDA() - executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker := createTestComponents(t, config) - return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker) + executor, sequencer, daClient, privKey, ds, stopDAHeightTicker := createTestComponents(t, config) + return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker) } func createNodeWithCustomComponents( @@ -169,11 +170,11 @@ func createNodeWithCustomComponents( executor coreexecutor.Executor, sequencer coresequencer.Sequencer, daClient block.DAClient, - nodeKey *key.NodeKey, + privKey crypto.PrivKey, ds datastore.Batching, stopDAHeightTicker func(), ) (*FullNode, func()) { - return newTestNode(t, config, executor, sequencer, daClient, nodeKey, ds, stopDAHeightTicker) + return newTestNode(t, config, executor, sequencer, daClient, privKey, ds, stopDAHeightTicker) } // Creates the given number of nodes the given nodes using the given wait group to synchronize them @@ -192,24 +193,28 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F aggListenAddress := config.P2P.ListenAddress aggPeers := config.P2P.Peers - executor, sequencer, daClient, aggP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, daClient, aggPrivKey, ds, stopDAHeightTicker := createTestComponents(t, config) if d, ok := daClient.(*testda.DummyDA); ok { d.Reset() } - aggPeerID, err := peer.IDFromPrivateKey(aggP2PKey.PrivKey) + aggPeerID, err := peer.IDFromPrivateKey(aggPrivKey) require.NoError(err) logger := zerolog.Nop() if testing.Verbose() { logger = zerolog.New(zerolog.NewTestWriter(t)) } + + aggP2PClient, err := newTestP2PClient(config, aggPrivKey, ds, genesis.ChainID, logger) + require.NoError(err) + aggNode, err := NewNode( config, executor, sequencer, daClient, remoteSigner, - aggP2PKey, + aggP2PClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -236,16 +241,19 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F } config.P2P.ListenAddress = fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 40001+i) config.RPC.Address = fmt.Sprintf("127.0.0.1:%d", 8001+i) - executor, sequencer, daClient, nodeP2PKey, ds, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, daClient, nodePrivKey, ds, stopDAHeightTicker := createTestComponents(t, config) stopDAHeightTicker() + nodeP2PClient, err := newTestP2PClient(config, nodePrivKey, ds, genesis.ChainID, logger) + require.NoError(err) + node, err := NewNode( config, executor, sequencer, daClient, nil, - nodeP2PKey, + nodeP2PClient, genesis, ds, DefaultMetricsProvider(evconfig.DefaultInstrumentationConfig()), @@ -257,7 +265,7 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F // No-op: ticker already stopped } nodes[i], cleanups[i] = node.(*FullNode), cleanup - nodePeerID, err := peer.IDFromPrivateKey(nodeP2PKey.PrivKey) + nodePeerID, err := peer.IDFromPrivateKey(nodePrivKey) require.NoError(err) peersList = append(peersList, fmt.Sprintf("%s/p2p/%s", config.P2P.ListenAddress, nodePeerID.Loggable()["peerID"].(string))) } @@ -265,6 +273,11 @@ func createNodesWithCleanup(t *testing.T, num int, config evconfig.Config) ([]*F return nodes, cleanups } +// newTestP2PClient creates a p2p.Client for testing. +func newTestP2PClient(config evconfig.Config, privKey crypto.PrivKey, ds datastore.Batching, chainID string, logger zerolog.Logger) (*p2p.Client, error) { + return p2p.NewClient(config.P2P, privKey, ds, chainID, logger, nil) +} + // Helper to create N contexts and cancel functions func createNodeContexts(n int) ([]context.Context, []context.CancelFunc) { ctxs := make([]context.Context, n) diff --git a/node/light.go b/node/light.go index 8790507a07..7aeb9038b6 100644 --- a/node/light.go +++ b/node/light.go @@ -7,7 +7,6 @@ import ( "net/http" "time" - "github.com/evstack/ev-node/pkg/p2p/key" ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -39,15 +38,10 @@ type LightNode struct { func newLightNode( conf config.Config, genesis genesis.Genesis, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, database ds.Batching, logger zerolog.Logger, ) (ln *LightNode, err error) { - p2pClient, err := p2p.NewClient(conf.P2P, nodeKey.PrivKey, database, genesis.ChainID, logger, nil) - if err != nil { - return nil, err - } - componentLogger := logger.With().Str("component", "HeaderSyncService").Logger() baseStore := store.New(database) @@ -166,9 +160,8 @@ func (ln *LightNode) Run(parentCtx context.Context) error { } // Stop P2P Client - err = ln.P2P.Close() - if err != nil { - multiErr = errors.Join(multiErr, fmt.Errorf("closing P2P client: %w", err)) + if err := ln.P2P.Close(); err != nil { + multiErr = errors.Join(multiErr, fmt.Errorf("closing p2p client: %w", err)) } if err = ln.Store.Close(); err != nil { diff --git a/node/light_test.go b/node/light_test.go index f459ad54b0..964cf66041 100644 --- a/node/light_test.go +++ b/node/light_test.go @@ -13,6 +13,7 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" p2p_key "github.com/evstack/ev-node/pkg/p2p/key" ) @@ -39,7 +40,10 @@ func TestLightNodeLifecycle(t *testing.T) { logger := zerolog.Nop() db := ds_sync.MutexWrap(ds.NewMapDatastore()) - ln, err := newLightNode(conf, gen, p2pKey, db, logger) + p2pClient, err := p2p.NewClient(conf.P2P, p2pKey.PrivKey, db, gen.ChainID, logger, nil) + require.NoError(err) + + ln, err := newLightNode(conf, gen, p2pClient, db, logger) require.NoError(err) require.NotNil(ln) diff --git a/node/node.go b/node/node.go index 7ce087eeeb..d8aeea333f 100644 --- a/node/node.go +++ b/node/node.go @@ -9,7 +9,7 @@ import ( coresequencer "github.com/evstack/ev-node/core/sequencer" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/service" "github.com/evstack/ev-node/pkg/signer" ) @@ -25,8 +25,8 @@ type NodeOptions struct { BlockOptions block.BlockOptions } -// NewNode returns a new Full or Light Node based on the config -// This is the entry point for composing a node, when compiling a node, you need to provide an executor +// NewNode returns a new Full or Light Node based on the config. +// This is the entry point for composing a node, when compiling a node, you need to provide an executor. // Example executors can be found in apps/ func NewNode( conf config.Config, @@ -34,7 +34,7 @@ func NewNode( sequencer coresequencer.Sequencer, daClient block.DAClient, signer signer.Signer, - nodeKey *key.NodeKey, + p2pClient *p2p.Client, genesis genesis.Genesis, database ds.Batching, metricsProvider MetricsProvider, @@ -42,7 +42,7 @@ func NewNode( nodeOptions NodeOptions, ) (Node, error) { if conf.Node.Light { - return newLightNode(conf, genesis, nodeKey, database, logger) + return newLightNode(conf, genesis, p2pClient, database, logger) } if err := nodeOptions.BlockOptions.Validate(); err != nil { @@ -51,7 +51,7 @@ func NewNode( return newFullNode( conf, - nodeKey, + p2pClient, signer, genesis, database, diff --git a/node/single_sequencer_integration_test.go b/node/single_sequencer_integration_test.go index 0a798afa6c..130b62e235 100644 --- a/node/single_sequencer_integration_test.go +++ b/node/single_sequencer_integration_test.go @@ -234,10 +234,10 @@ func TestStateRecovery(t *testing.T) { // Set up one sequencer config := getTestConfig(t, 1) - executor, sequencer, dac, nodeKey, _, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, dac, privKey, _, stopDAHeightTicker := createTestComponents(t, config) ds, err := store.NewDefaultKVStore(config.RootDir, "db", "test") require.NoError(err) - node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dac, nodeKey, ds, stopDAHeightTicker) + node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dac, privKey, ds, stopDAHeightTicker) defer cleanup() var runningWg sync.WaitGroup @@ -262,10 +262,10 @@ func TestStateRecovery(t *testing.T) { shutdownAndWait(t, []context.CancelFunc{cancel}, &runningWg, 60*time.Second) // Create a new node instance using the same components - executor, sequencer, dac, nodeKey, _, stopDAHeightTicker = createTestComponents(t, config) + executor, sequencer, dac, privKey, _, stopDAHeightTicker = createTestComponents(t, config) ds, err = store.NewDefaultKVStore(config.RootDir, "db", "test") require.NoError(err) - node, cleanup = createNodeWithCustomComponents(t, config, executor, sequencer, dac, nodeKey, ds, stopDAHeightTicker) + node, cleanup = createNodeWithCustomComponents(t, config, executor, sequencer, dac, privKey, ds, stopDAHeightTicker) defer cleanup() // Verify state persistence @@ -319,7 +319,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { config.DA.BlockTime = evconfig.DurationWrapper{Duration: 100 * time.Millisecond} // Longer DA time to ensure blocks are produced first // Create test components - executor, sequencer, dummyDA, ds, nodeKey, stopDAHeightTicker := createTestComponents(t, config) + executor, sequencer, dummyDA, privKey, ds, stopDAHeightTicker := createTestComponents(t, config) defer stopDAHeightTicker() // Cast executor to DummyExecutor so we can inject transactions @@ -331,7 +331,7 @@ func TestBatchQueueThrottlingWithDAFailure(t *testing.T) { require.True(ok, "Expected testda.DummyDA implementation") // Create node with components - node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dummyDAImpl, ds, nodeKey, func() {}) + node, cleanup := createNodeWithCustomComponents(t, config, executor, sequencer, dummyDAImpl, privKey, ds, func() {}) defer cleanup() ctx, cancel := context.WithCancel(t.Context()) diff --git a/pkg/cmd/run_node.go b/pkg/cmd/run_node.go index aa6a01b7ae..33b1eba006 100644 --- a/pkg/cmd/run_node.go +++ b/pkg/cmd/run_node.go @@ -23,6 +23,7 @@ import ( rollconf "github.com/evstack/ev-node/pkg/config" blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc" genesispkg "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/p2p/key" "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/file" @@ -163,6 +164,11 @@ func StartNode( executor = telemetry.WithTracingExecutor(executor) } + p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, datastore, genesis.ChainID, logger, nil) + if err != nil { + return fmt.Errorf("create p2p client: %w", err) + } + // Create and start the node rollnode, err := node.NewNode( nodeConfig, @@ -170,7 +176,7 @@ func StartNode( sequencer, daClient, signer, - nodeKey, + p2pClient, genesis, datastore, metrics, diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index 2e58c1c637..39ff4261b6 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -20,6 +20,7 @@ import ( "github.com/rs/zerolog" "github.com/spf13/cobra" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executor, coresequencer.Sequencer, signer.Signer, *key.NodeKey, datastore.Batching, func()) { @@ -33,7 +34,11 @@ func createTestComponents(_ context.Context, t *testing.T) (coreexecutor.Executo // Create a dummy P2P client and datastore for testing ds := datastore.NewMapDatastore() - return executor, sequencer, keyProvider, nil, ds, func() {} + // Generate a dummy node key for the P2P client + nodeKey, err := key.GenerateNodeKey() + require.NoError(t, err) + + return executor, sequencer, keyProvider, nodeKey, ds, func() {} } func TestParseFlags(t *testing.T) { diff --git a/pkg/p2p/client.go b/pkg/p2p/client.go index a1c8de94ba..4288f749f9 100644 --- a/pkg/p2p/client.go +++ b/pkg/p2p/client.go @@ -49,11 +49,12 @@ type Client struct { chainID string privKey crypto.PrivKey - host host.Host - dht *dht.IpfsDHT - disc *discovery.RoutingDiscovery - gater *conngater.BasicConnectionGater - ps *pubsub.PubSub + host host.Host + dht *dht.IpfsDHT + disc *discovery.RoutingDiscovery + gater *conngater.BasicConnectionGater + ps *pubsub.PubSub + started bool metrics *Metrics } @@ -121,6 +122,9 @@ func NewClientWithHost( // 3. Setup DHT, establish connection to seed nodes and initialize peer discovery. // 4. Use active peer discovery to look for peers from same ORU network. func (c *Client) Start(ctx context.Context) error { + if c.started { + return nil + } c.logger.Debug().Msg("starting P2P client") if c.host != nil { @@ -165,19 +169,21 @@ func (c *Client) startWithHost(ctx context.Context, h host.Host) error { return err } + c.started = true return nil } // Close gently stops Client. func (c *Client) Close() error { - var dhtErr, hostErr error + var err error if c.dht != nil { - dhtErr = c.dht.Close() + err = errors.Join(err, c.dht.Close()) } if c.host != nil { - hostErr = c.host.Close() + err = errors.Join(err, c.host.Close()) } - return errors.Join(dhtErr, hostErr) + c.started = false + return err } // Addrs returns listen addresses of Client. diff --git a/pkg/p2p/utils_test.go b/pkg/p2p/utils_test.go index 5bd5664dd2..e14a621233 100644 --- a/pkg/p2p/utils_test.go +++ b/pkg/p2p/utils_test.go @@ -3,7 +3,6 @@ package p2p import ( "context" "crypto/rand" - "errors" "fmt" "net" "path/filepath" @@ -25,11 +24,10 @@ import ( type testNet []*Client -func (tn testNet) Close() (err error) { +func (tn testNet) Close() { for i := range tn { - err = errors.Join(err, tn[i].Close()) + _ = tn[i].Close() } - return } func (tn testNet) WaitForDHT() { diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 8567e79764..b65b855a4b 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -20,7 +20,6 @@ import ( "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" - "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) @@ -38,6 +37,15 @@ type HeaderSyncService = SyncService[*types.P2PSignedHeader] // DataSyncService is the P2P Sync Service for blocks. type DataSyncService = SyncService[*types.P2PData] +// P2PClient defines the interface for P2P client operations needed by the sync service. +type P2PClient interface { + PubSub() *pubsub.PubSub + Info() (string, string, string, error) + Host() host.Host + ConnectionGater() *conngater.BasicConnectionGater + PeerIDs() []peer.ID +} + // SyncService is the P2P Sync Service for blocks and headers. // // Uses the go-header library for handling all P2P logic. @@ -48,7 +56,7 @@ type SyncService[H store.EntityWithDAHint[H]] struct { genesis genesis.Genesis - p2p *p2p.Client + p2p P2PClient ex *goheaderp2p.Exchange[H] sub *goheaderp2p.Subscriber[H] @@ -66,7 +74,7 @@ func NewDataSyncService( evStore store.Store, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*DataSyncService, error) { storeAdapter := store.NewDataStoreAdapter(evStore, genesis) @@ -78,7 +86,7 @@ func NewHeaderSyncService( evStore store.Store, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*HeaderSyncService, error) { storeAdapter := store.NewHeaderStoreAdapter(evStore, genesis) @@ -90,7 +98,7 @@ func newSyncService[H store.EntityWithDAHint[H]]( syncType syncType, conf config.Config, genesis genesis.Genesis, - p2p *p2p.Client, + p2p P2PClient, logger zerolog.Logger, ) (*SyncService[H], error) { if p2p == nil { diff --git a/test/e2e/evm_full_node_e2e_test.go b/test/e2e/evm_full_node_e2e_test.go index 9e302959d2..e1981dbc1b 100644 --- a/test/e2e/evm_full_node_e2e_test.go +++ b/test/e2e/evm_full_node_e2e_test.go @@ -146,7 +146,7 @@ func verifyTransactionSync(t *testing.T, sequencerClient, fullNodeClient *ethcli } } return false - }, 60*time.Second, 500*time.Millisecond, "Full node should sync the block containing the transaction") + }, 3*time.Minute, 500*time.Millisecond, "Full node should sync the block containing the transaction") // Final verification - both nodes should have the transaction in the same block sequencerReceipt, err := sequencerClient.TransactionReceipt(ctx, txHash) diff --git a/test/e2e/evm_test_common.go b/test/e2e/evm_test_common.go index 5dda0421bd..d5a7215168 100644 --- a/test/e2e/evm_test_common.go +++ b/test/e2e/evm_test_common.go @@ -378,6 +378,7 @@ func setupSequencerNodeLazy(t *testing.T, sut *SystemUnderTest, sequencerHome, j // Use helper methods to get complete URLs args := []string{ "start", + "--evnode.log.level", "debug", "--evnode.log.format", "json", "--evm.jwt-secret-file", jwtSecretFile, "--evm.genesis-hash", genesisHash, @@ -440,6 +441,7 @@ func setupFullNode(t *testing.T, sut *SystemUnderTest, fullNodeHome, sequencerHo // Use helper methods to get complete URLs args := []string{ "start", + "--evnode.log.level", "debug", "--evnode.log.format", "json", "--home", fullNodeHome, "--evm.jwt-secret-file", fullNodeJwtSecretFile, diff --git a/test/e2e/failover_e2e_test.go b/test/e2e/failover_e2e_test.go index 580f3141ce..cc0276e43a 100644 --- a/test/e2e/failover_e2e_test.go +++ b/test/e2e/failover_e2e_test.go @@ -384,6 +384,12 @@ func TestHASequencerRollingRestartE2E(t *testing.T) { nodeJWT := getNodeJWT(nodeName) p2pPeers := getP2PPeers(nodeName) + // Kill old process just in case + if nodeDetails.IsRunning() { + _ = nodeDetails.Kill() + time.Sleep(200 * time.Millisecond) + } + restartedProc := setupRaftSequencerNode(t, sut, workDir, nodeName, nodeDetails.raftAddr, nodeJWT, genesisHash, testEndpoints.GetDAAddress(), "", raftCluster, p2pPeers, strings.TrimPrefix(nodeDetails.rpcAddr, "http://"), nodeDetails.p2pAddr,