Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ jobs:
temporal operator search-attribute create --name MirrorName --type Text --namespace default
./peer-flow worker > ../logs/peer-flow-worker.log 2>&1 &
./peer-flow snapshot-worker > ../logs/peer-flow-snapshot-worker.log 2>&1 &
./peer-flow api --port 8112 --gateway-port 8113 > ../logs/peer-flow-api.log 2>&1 &
gotestsum --format standard-quiet --no-color --junitfile ../test-results.xml -- -cover -coverpkg github.com/PeerDB-io/peerdb/flow/... -p 32 ./... -timeout 900s -args -test.gocoverdir="$PWD/coverage"
killall peer-flow
sleep 1
Expand Down
63 changes: 31 additions & 32 deletions flow/e2e/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"github.com/PeerDB-io/peerdb/flow/cmd"
connmongo "github.com/PeerDB-io/peerdb/flow/connectors/mongo"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/e2eshared"
Expand All @@ -32,7 +33,7 @@ import (
)

type APITestSuite struct {
protos.FlowServiceClient
*cmd.FlowRequestHandler
t *testing.T
pg *PostgresSource
source SuiteSource
Expand Down Expand Up @@ -217,10 +218,8 @@ func testApi[TSource SuiteSource](
require.NoError(t, err)
source, err := setup(t, suffix)
require.NoError(t, err)
client, err := NewApiClient()
require.NoError(t, err)
return APITestSuite{
FlowServiceClient: client,
FlowRequestHandler: NewFlowHandler(t),
t: t,
pg: pg,
source: source,
Expand Down Expand Up @@ -436,12 +435,12 @@ func (s APITestSuite) TestSchemaEndpoints() {
require.False(s.t, columns.Columns[1].IsKey)
require.Equal(s.t, "text", columns.Columns[1].Type)
case *MySqlSource:
columns, err := s.GetColumns(s.t.Context(), &protos.TableColumnsRequest{
columns, apiErr := s.GetColumns(s.t.Context(), &protos.TableColumnsRequest{
PeerName: peer.Name,
SchemaName: Schema(s),
TableName: tableName,
})
require.NoError(s.t, err)
require.NoError(s.t, apiErr)
require.Len(s.t, columns.Columns, 2)
require.Equal(s.t, "id", columns.Columns[0].Name)
require.True(s.t, columns.Columns[0].IsKey)
Expand Down Expand Up @@ -747,8 +746,8 @@ func (s APITestSuite) TestResyncCompleted() {
flowConnConfig.IdleTimeoutSeconds = 9
flowConnConfig.MaxBatchSize = 5040
// if true, then the flow will be resynced
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -839,8 +838,8 @@ func (s APITestSuite) TestResyncFailed() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true

response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -910,8 +909,8 @@ func (s APITestSuite) TestDropCompleted() {
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
flowConnConfig.InitialSnapshotOnly = true
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -994,8 +993,8 @@ func (s APITestSuite) TestDropCompletedAndUnavailable() {
flowConnConfig.InitialSnapshotOnly = true
flowConnConfig.SourceName = proxyPeer.Name

response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -1060,8 +1059,8 @@ func (s APITestSuite) TestEditTablesBeforeResync() {
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -1319,8 +1318,8 @@ func (s APITestSuite) TestTotalRowsSyncedByMirror() {
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -1406,8 +1405,8 @@ func (s APITestSuite) TestPostgresTableOIDsMigration() {
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -1505,10 +1504,10 @@ func (s APITestSuite) TestQRep() {
s.t.Skip("QRepFlowWorkFlow is not implemented for MongoDB")
}

peerType, err := s.GetPeerType(s.t.Context(), &protos.PeerInfoRequest{
peerType, apiErr := s.GetPeerType(s.t.Context(), &protos.PeerInfoRequest{
PeerName: s.source.GeneratePeer(s.t).Name,
})
require.NoError(s.t, err)
require.NoError(s.t, apiErr)
tableName := AddSuffix(s, "qrepapi")
schemaQualified := AttachSchema(s, tableName)
require.NoError(s.t, s.source.Exec(s.t.Context(),
Expand All @@ -1534,10 +1533,10 @@ func (s APITestSuite) TestQRep() {
qrepConfig.InitialCopyOnly = false
qrepConfig.WaitBetweenBatchesSeconds = 5
qrepConfig.NumRowsPerPartition = 1
_, err = s.CreateQRepFlow(s.t.Context(), &protos.CreateQRepFlowRequest{
_, apiErr = s.CreateQRepFlow(s.t.Context(), &protos.CreateQRepFlowRequest{
QrepConfig: qrepConfig,
})
require.NoError(s.t, err)
require.NoError(s.t, apiErr)

tc := NewTemporalClient(s.t)
env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, qrepConfig.FlowJobName)
Expand Down Expand Up @@ -1599,8 +1598,8 @@ func (s APITestSuite) TestTableAdditionWithoutInitialLoad() {
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
flowConnConfig.DoInitialSnapshot = true
response, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, err)
response, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: flowConnConfig})
require.NoError(s.t, apiErr)
require.NotNil(s.t, response)
tc := NewTemporalClient(s.t)
env, err := GetPeerflow(s.t.Context(), s.pg.PostgresConnector.Conn(), tc, flowConnConfig.FlowJobName)
Expand Down Expand Up @@ -1943,8 +1942,8 @@ func (s APITestSuite) TestCreateCDCFlowAttachExternalFlowEntry() {
// Simulate a crash: create flows entry without creating workflow
conn := s.pg.PostgresConnector.Conn()
sourcePeer := s.source.GeneratePeer(s.t)
destPeer, err := s.GetPeerInfo(s.t.Context(), &protos.PeerInfoRequest{PeerName: s.ch.Peer().Name})
require.NoError(s.t, err)
destPeer, apiErr := s.GetPeerInfo(s.t.Context(), &protos.PeerInfoRequest{PeerName: s.ch.Peer().Name})
require.NoError(s.t, apiErr)

var sourcePeerID, destPeerID int32
require.NoError(s.t, conn.QueryRow(s.t.Context(),
Expand Down Expand Up @@ -2005,11 +2004,11 @@ func (s APITestSuite) TestCreateCDCFlowAttachCanceledWorkflow() {
flowConnConfig.DoInitialSnapshot = true

// First create a normal flow
response1, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{
response1, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{
ConnectionConfigs: flowConnConfig,
AttachToExisting: true,
})
require.NoError(s.t, err)
require.NoError(s.t, apiErr)
require.NotNil(s.t, response1)

tc := NewTemporalClient(s.t)
Expand Down Expand Up @@ -2077,11 +2076,11 @@ func (s APITestSuite) TestCreateCDCFlowAttachIdempotentAfterContinueAsNew() {
flowConnConfig.DoInitialSnapshot = true

// First create a normal flow
response1, err := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{
response1, apiErr := s.CreateCDCFlow(s.t.Context(), &protos.CreateCDCFlowRequest{
ConnectionConfigs: flowConnConfig,
AttachToExisting: true,
})
require.NoError(s.t, err)
require.NoError(s.t, apiErr)
require.NotNil(s.t, response1)

tc := NewTemporalClient(s.t)
Expand Down
7 changes: 3 additions & 4 deletions flow/e2e/mysql_rds_binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/PeerDB-io/peerdb/flow/cmd"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
"github.com/PeerDB-io/peerdb/flow/e2eshared"
"github.com/PeerDB-io/peerdb/flow/generated/protos"
Expand All @@ -19,7 +20,7 @@ import (
// this test is not safe for parallel execution because it relies on a single table in a fixed place
// move it to its own suite
type MySQLRDSBinlogAPITestSuite struct {
protos.FlowServiceClient
*cmd.FlowRequestHandler
t *testing.T
pg *PostgresSource
source *MySqlSource
Expand Down Expand Up @@ -56,10 +57,8 @@ func TestMySQLRDSBinlog(t *testing.T) {
require.NoError(t, err)
source, err := SetupMySQL(t, suffix)
require.NoError(t, err)
client, err := NewApiClient()
require.NoError(t, err)
return MySQLRDSBinlogAPITestSuite{
FlowServiceClient: client,
FlowRequestHandler: NewFlowHandler(t),
t: t,
pg: pg,
source: source,
Expand Down
26 changes: 12 additions & 14 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import (
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/temporal"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/PeerDB-io/peerdb/flow/cmd"
"github.com/PeerDB-io/peerdb/flow/connectors"
connpostgres "github.com/PeerDB-io/peerdb/flow/connectors/postgres"
connsnowflake "github.com/PeerDB-io/peerdb/flow/connectors/snowflake"
Expand Down Expand Up @@ -502,9 +501,8 @@ func CreateQRepWorkflowConfig(
func RunQRepFlowWorkflow(t *testing.T, tc client.Client, config *protos.QRepConfig) WorkflowRun {
t.Helper()

client, err := NewApiClient()
require.NoError(t, err)
res, err := client.CreateQRepFlow(t.Context(), &protos.CreateQRepFlowRequest{QrepConfig: config})
handler := NewFlowHandler(t)
res, err := handler.CreateQRepFlow(t.Context(), &protos.CreateQRepFlowRequest{QrepConfig: config})
require.NoError(t, err)
return WorkflowRun{
WorkflowRun: tc.GetWorkflow(t.Context(), res.WorkflowId, ""),
Expand Down Expand Up @@ -631,12 +629,13 @@ func NewTemporalClient(t *testing.T) client.Client {
return tc
}

func NewApiClient() (protos.FlowServiceClient, error) {
client, err := grpc.NewClient("0.0.0.0:8112", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, err
}
return protos.NewFlowServiceClient(client), nil
func NewFlowHandler(t *testing.T) *cmd.FlowRequestHandler {
t.Helper()
tc := NewTemporalClient(t)
pool, err := internal.GetCatalogConnectionPoolFromEnv(t.Context())
require.NoError(t, err)
taskQueue := internal.PeerFlowTaskQueueName(shared.PeerFlowTaskQueue)
return cmd.NewFlowRequestHandler(t.Context(), tc, pool, taskQueue)
}

type WorkflowRun struct {
Expand All @@ -657,9 +656,8 @@ func GetPeerflow(ctx context.Context, catalog *pgx.Conn, tc client.Client, flowN
func ExecutePeerflow(t *testing.T, tc client.Client, config *protos.FlowConnectionConfigs) WorkflowRun {
t.Helper()

client, err := NewApiClient()
require.NoError(t, err)
res, err := client.CreateCDCFlow(t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: config})
handler := NewFlowHandler(t)
res, err := handler.CreateCDCFlow(t.Context(), &protos.CreateCDCFlowRequest{ConnectionConfigs: config})
require.NoError(t, err)
return WorkflowRun{
WorkflowRun: tc.GetWorkflow(t.Context(), res.WorkflowId, ""),
Expand Down
Loading