diff --git a/cmd/outpost-migrate-redis/migration.go b/cmd/outpost-migrate-redis/migration.go index f2629914d..0da825677 100644 --- a/cmd/outpost-migrate-redis/migration.go +++ b/cmd/outpost-migrate-redis/migration.go @@ -14,14 +14,15 @@ import ( ) const ( - migrationLockKey = ".outpost:migration:lock" + migrationLockKey = ".outpost:migration_lock" ) // Migrator handles Redis migrations type Migrator struct { - client *redisClientWrapper - logger MigrationLogger - migrations map[string]migratorredis.Migration // All available migrations + client *redisClientWrapper + logger MigrationLogger + migrations map[string]migratorredis.Migration // All available migrations + deploymentID string } // Close cleans up resources (logger sync, redis connection, etc) @@ -73,12 +74,28 @@ func NewMigrator(cfg *config.Config, logger MigrationLogger) (*Migrator, error) } return &Migrator{ - client: client, - logger: logger, - migrations: migrationMap, + client: client, + logger: logger, + migrations: migrationMap, + deploymentID: cfg.DeploymentID, }, nil } +func (m *Migrator) deploymentPrefix() string { + if m.deploymentID == "" { + return "" + } + return m.deploymentID + ":" +} + +func (m *Migrator) lockKey() string { + return m.deploymentPrefix() + migrationLockKey +} + +func (m *Migrator) migrationKey(name string) string { + return fmt.Sprintf("%soutpost:migration:%s", m.deploymentPrefix(), name) +} + // ListMigrations lists all available migrations func (m *Migrator) ListMigrations() error { // Build map of name -> description from actual migrations @@ -98,14 +115,14 @@ func (m *Migrator) acquireLock(ctx context.Context, migrationName string) error // Try to set lock atomically with SetNX (only sets if not exists) // Use 1 hour expiry in case process dies without cleanup - success, err := m.client.SetNX(ctx, migrationLockKey, lock, time.Hour).Result() + success, err := m.client.SetNX(ctx, m.lockKey(), lock, time.Hour).Result() if err != nil { return fmt.Errorf("failed to acquire lock: %w", err) } if !success { // Lock already exists, get details for error message - lockData, err := m.client.Get(ctx, migrationLockKey).Result() + lockData, err := m.client.Get(ctx, m.lockKey()).Result() if err != nil { return fmt.Errorf("migration is already running (could not get lock details: %w)", err) } @@ -119,7 +136,7 @@ func (m *Migrator) acquireLock(ctx context.Context, migrationName string) error // releaseLock releases the migration lock func (m *Migrator) releaseLock(ctx context.Context) error { - err := m.client.Del(ctx, migrationLockKey).Err() + err := m.client.Del(ctx, m.lockKey()).Err() if err != nil { return fmt.Errorf("failed to release lock: %w", err) } @@ -130,7 +147,7 @@ func (m *Migrator) releaseLock(ctx context.Context) error { // Unlock forcefully clears the migration lock (for stuck situations) func (m *Migrator) Unlock(ctx context.Context, autoApprove bool) error { // Check if lock exists - lockData, err := m.client.Get(ctx, migrationLockKey).Result() + lockData, err := m.client.Get(ctx, m.lockKey()).Result() if err != nil && err.Error() == "redis: nil" { m.logger.LogLockStatus("", false) return nil @@ -155,7 +172,7 @@ func (m *Migrator) Unlock(ctx context.Context, autoApprove bool) error { } } - err = m.client.Del(ctx, migrationLockKey).Err() + err = m.client.Del(ctx, m.lockKey()).Err() if err != nil { return fmt.Errorf("failed to clear lock: %w", err) } @@ -221,7 +238,7 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error { // Mark all migrations as applied for name := range m.migrations { - if err := setMigrationAsApplied(ctx, m.client, name); err != nil { + if err := m.setMigrationAsApplied(ctx, name); err != nil { return fmt.Errorf("failed to mark migration %s as applied: %w", name, err) } } @@ -238,7 +255,7 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error { // Get pending migrations count (not satisfied = not applied and not marked as not_applicable) pendingCount := 0 for name := range m.migrations { - if !isSatisfied(ctx, m.client, name) { + if !m.isSatisfied(ctx, name) { pendingCount++ } } @@ -255,21 +272,24 @@ func (m *Migrator) Init(ctx context.Context, currentCheck bool) error { // checkIfFreshInstallation checks if Redis is a fresh installation func (m *Migrator) checkIfFreshInstallation(ctx context.Context) (bool, error) { + prefix := m.deploymentPrefix() + // Check for any "outpost:*" keys (current format) - outpostKeys, err := m.client.Keys(ctx, "outpost:*").Result() + // Use SCAN instead of KEYS to avoid blocking Redis on large keyspaces + hasKeys, err := m.hasAnyKeys(ctx, prefix+"outpost:*") if err != nil { return false, fmt.Errorf("failed to check outpost keys: %w", err) } - if len(outpostKeys) > 0 { + if hasKeys { return false, nil // Has current data } - // Check for any "tenant:*" keys (old format) - tenantKeys, err := m.client.Keys(ctx, "tenant:*").Result() + // Check for any "tenant:*" keys (old format, or deployment-scoped tenant keys) + hasKeys, err = m.hasAnyKeys(ctx, prefix+"tenant:*") if err != nil { return false, fmt.Errorf("failed to check tenant keys: %w", err) } - if len(tenantKeys) > 0 { + if hasKeys { return false, nil // Has old data } @@ -277,13 +297,23 @@ func (m *Migrator) checkIfFreshInstallation(ctx context.Context) (bool, error) { return true, nil } +// hasAnyKeys uses SCAN to check if at least one key matches the pattern +// without blocking Redis like KEYS does. +func (m *Migrator) hasAnyKeys(ctx context.Context, pattern string) (bool, error) { + keys, _, err := m.client.Scan(ctx, 0, pattern, 1).Result() + if err != nil { + return false, err + } + return len(keys) > 0, nil +} + // Plan shows what changes would be made without applying them func (m *Migrator) Plan(ctx context.Context) error { // First show current status var satisfiedCount, pendingCount int for name := range m.migrations { - if isSatisfied(ctx, m.client, name) { + if m.isSatisfied(ctx, name) { satisfiedCount++ } else { pendingCount++ @@ -453,7 +483,7 @@ func (m *Migrator) ApplyOne(ctx context.Context, autoApprove, rerun bool, migrat if !ok { return fmt.Errorf("migration not found: %s", migrationName) } - alreadyApplied = isApplied(ctx, m.client, migrationName) + alreadyApplied = m.isApplied(ctx, migrationName) if alreadyApplied && !rerun { m.logger.LogInfo(fmt.Sprintf("migration %s already applied (use --rerun to re-run)", migrationName)) @@ -511,12 +541,12 @@ func (m *Migrator) ApplyOne(ctx context.Context, autoApprove, rerun bool, migrat duration := time.Since(startTime) // Mark migration as applied (or update applied_at if re-running) - if err := setMigrationAsApplied(ctx, m.client, mig.Name()); err != nil { + if err := m.setMigrationAsApplied(ctx, mig.Name()); err != nil { return fmt.Errorf("failed to mark migration as applied: %w", err) } // Record run history - if err := recordMigrationRun(ctx, m.client, mig.Name(), state, rerun, duration); err != nil { + if err := m.recordMigrationRun(ctx, mig.Name(), state, rerun, duration); err != nil { m.logger.LogWarning(fmt.Sprintf("failed to record run history: %v", err)) // Don't fail the migration for history recording errors } @@ -552,7 +582,7 @@ func (m *Migrator) Apply(ctx context.Context, autoApprove bool) error { applicable, reason := mig.IsApplicable(ctx) if !applicable { m.logger.LogInfo(fmt.Sprintf(" %s: Skipped (%s)", mig.Name(), reason)) - if err := setMigrationNotApplicable(ctx, m.client, mig.Name(), reason); err != nil { + if err := m.setMigrationNotApplicable(ctx, mig.Name(), reason); err != nil { return fmt.Errorf("failed to mark %s as not applicable: %w", mig.Name(), err) } skipped++ @@ -616,13 +646,13 @@ func (m *Migrator) Apply(ctx context.Context, autoApprove bool) error { duration := time.Since(startTime) // Mark as applied - if err := setMigrationAsApplied(ctx, m.client, mig.Name()); err != nil { + if err := m.setMigrationAsApplied(ctx, mig.Name()); err != nil { m.releaseLock(ctx) return fmt.Errorf("failed to mark %s as applied: %w", mig.Name(), err) } // Record run history - if err := recordMigrationRun(ctx, m.client, mig.Name(), state, false, duration); err != nil { + if err := m.recordMigrationRun(ctx, mig.Name(), state, false, duration); err != nil { m.logger.LogWarning(fmt.Sprintf("failed to record run history: %v", err)) } @@ -674,7 +704,7 @@ func (m *Migrator) getPendingMigrations(ctx context.Context) []migratorredis.Mig var pending []migratorredis.Migration for _, entry := range sorted { - if !isSatisfied(ctx, m.client, entry.name) { + if !m.isSatisfied(ctx, entry.name) { pending = append(pending, entry.migration) } } @@ -682,9 +712,8 @@ func (m *Migrator) getPendingMigrations(ctx context.Context) []migratorredis.Mig } // isSatisfied checks if a migration has been satisfied (applied or not applicable) -func isSatisfied(ctx context.Context, client *redisClientWrapper, name string) bool { - key := fmt.Sprintf("outpost:migration:%s", name) - val, err := client.HGet(ctx, key, "status").Result() +func (m *Migrator) isSatisfied(ctx context.Context, name string) bool { + val, err := m.client.HGet(ctx, m.migrationKey(name), "status").Result() if err != nil { return false } @@ -692,9 +721,8 @@ func isSatisfied(ctx context.Context, client *redisClientWrapper, name string) b } // isApplied checks if a migration has been applied (not just satisfied) -func isApplied(ctx context.Context, client *redisClientWrapper, name string) bool { - key := fmt.Sprintf("outpost:migration:%s", name) - val, err := client.HGet(ctx, key, "status").Result() +func (m *Migrator) isApplied(ctx context.Context, name string) bool { + val, err := m.client.HGet(ctx, m.migrationKey(name), "status").Result() if err != nil { return false } @@ -718,7 +746,7 @@ func (m *Migrator) getNextMigration(ctx context.Context) (migratorredis.Migratio // Find first unsatisfied for _, entry := range sorted { - if !isSatisfied(ctx, m.client, entry.name) { + if !m.isSatisfied(ctx, entry.name) { return entry.migration, nil } } @@ -743,7 +771,7 @@ func (m *Migrator) getLastAppliedMigration(ctx context.Context) (migratorredis.M // Find last applied for _, entry := range sorted { - if isApplied(ctx, m.client, entry.name) { + if m.isApplied(ctx, entry.name) { return entry.migration, nil } } @@ -752,23 +780,21 @@ func (m *Migrator) getLastAppliedMigration(ctx context.Context) (migratorredis.M } // setMigrationAsApplied marks a migration as applied -func setMigrationAsApplied(ctx context.Context, client *redisClientWrapper, name string) error { - key := fmt.Sprintf("outpost:migration:%s", name) +func (m *Migrator) setMigrationAsApplied(ctx context.Context, name string) error { now := time.Now().Unix() // Use Redis hash to store migration state - return client.HSet(ctx, key, + return m.client.HSet(ctx, m.migrationKey(name), "status", "applied", "applied_at", fmt.Sprintf("%d", now), ).Err() } // setMigrationNotApplicable marks a migration as not applicable -func setMigrationNotApplicable(ctx context.Context, client *redisClientWrapper, name, reason string) error { - key := fmt.Sprintf("outpost:migration:%s", name) +func (m *Migrator) setMigrationNotApplicable(ctx context.Context, name, reason string) error { now := time.Now().Unix() - return client.HSet(ctx, key, + return m.client.HSet(ctx, m.migrationKey(name), "status", "not_applicable", "checked_at", fmt.Sprintf("%d", now), "reason", reason, @@ -776,12 +802,11 @@ func setMigrationNotApplicable(ctx context.Context, client *redisClientWrapper, } // recordMigrationRun records a migration run in the history -// Key format: outpost:migration:{name}:run:{timestamp} -func recordMigrationRun(ctx context.Context, client *redisClientWrapper, name string, state *migratorredis.State, rerun bool, duration time.Duration) error { +func (m *Migrator) recordMigrationRun(ctx context.Context, name string, state *migratorredis.State, rerun bool, duration time.Duration) error { now := time.Now().Unix() - key := fmt.Sprintf("outpost:migration:%s:run:%d", name, now) + key := fmt.Sprintf("%soutpost:migration:%s:run:%d", m.deploymentPrefix(), name, now) - return client.HSet(ctx, key, + return m.client.HSet(ctx, key, "processed", state.Progress.ProcessedItems, "skipped", state.Progress.SkippedItems, "failed", state.Progress.FailedItems, diff --git a/internal/app/app.go b/internal/app/app.go index 01a58e5c9..e28407d23 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -226,7 +226,7 @@ func (a *App) initializeInfrastructure(ctx context.Context) error { } func (a *App) initializeTelemetry(ctx context.Context) error { - installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig()) + installationID, err := getInstallation(ctx, a.redisClient, a.config.Telemetry.ToTelemetryConfig(), a.config.DeploymentID) if err != nil { return err } diff --git a/internal/app/installation.go b/internal/app/installation.go index 3c43a0970..85c1350f3 100644 --- a/internal/app/installation.go +++ b/internal/app/installation.go @@ -2,6 +2,8 @@ package app import ( "context" + "fmt" + "time" "github.com/hookdeck/outpost/internal/idgen" "github.com/hookdeck/outpost/internal/redis" @@ -9,17 +11,25 @@ import ( ) const ( - outpostrcKey = "outpostrc" - installationKey = "installation" + installationIDKey = "outpost:installation_id" ) -func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryConfig telemetry.TelemetryConfig) (string, error) { +func installationKey(deploymentID string) string { + if deploymentID == "" { + return installationIDKey + } + return fmt.Sprintf("%s:%s", deploymentID, installationIDKey) +} + +func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryConfig telemetry.TelemetryConfig, deploymentID string) (string, error) { if telemetryConfig.Disabled { return "", nil } + key := installationKey(deploymentID) + // First attempt: try to get existing installation ID - installationID, err := redisClient.HGet(ctx, outpostrcKey, installationKey).Result() + installationID, err := redisClient.Get(ctx, key).Result() if err == nil { return installationID, nil } @@ -31,9 +41,9 @@ func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryCo // Installation ID doesn't exist, create one atomically newInstallationID := idgen.Installation() - // Use HSETNX to atomically set the installation ID only if it doesn't exist + // Use SetNX to atomically set the installation ID only if it doesn't exist // This prevents race conditions when multiple Outpost instances start simultaneously - wasSet, err := redisClient.HSetNX(ctx, outpostrcKey, installationKey, newInstallationID).Result() + wasSet, err := redisClient.SetNX(ctx, key, newInstallationID, time.Duration(0)).Result() if err != nil { return "", err } @@ -44,7 +54,7 @@ func getInstallation(ctx context.Context, redisClient redis.Cmdable, telemetryCo // Another instance set the installation ID while we were generating ours // Fetch the installation ID that was actually set - installationID, err = redisClient.HGet(ctx, outpostrcKey, installationKey).Result() + installationID, err = redisClient.Get(ctx, key).Result() if err != nil { return "", err } diff --git a/internal/app/installation_test.go b/internal/app/installation_test.go index 03c9f8ac2..7012d9cd7 100644 --- a/internal/app/installation_test.go +++ b/internal/app/installation_test.go @@ -21,28 +21,28 @@ func TestGetInstallationAtomic(t *testing.T) { require.NoError(t, err) // Clear any existing installation ID - redisClient.Del(ctx, outpostrcKey) + redisClient.Del(ctx, installationKey("")) config := telemetry.TelemetryConfig{Disabled: false} // Test 1: First call should create installation ID - id1, err := getInstallation(ctx, redisClient, config) + id1, err := getInstallation(ctx, redisClient, config, "") require.NoError(t, err) assert.NotEmpty(t, id1) // Test 2: Second call should return the same ID (atomic consistency) - id2, err := getInstallation(ctx, redisClient, config) + id2, err := getInstallation(ctx, redisClient, config, "") require.NoError(t, err) assert.Equal(t, id1, id2, "Installation ID should be consistent across calls") // Test 3: Verify the ID is actually stored in Redis - storedID, err := redisClient.HGet(ctx, outpostrcKey, installationKey).Result() + storedID, err := redisClient.Get(ctx, installationKey("")).Result() require.NoError(t, err) assert.Equal(t, id1, storedID, "Stored ID should match returned ID") // Test 4: Test with telemetry disabled disabledConfig := telemetry.TelemetryConfig{Disabled: true} - id3, err := getInstallation(ctx, redisClient, disabledConfig) + id3, err := getInstallation(ctx, redisClient, disabledConfig, "") require.NoError(t, err) assert.Empty(t, id3, "Should return empty string when telemetry is disabled") } @@ -55,7 +55,7 @@ func TestGetInstallationConcurrency(t *testing.T) { require.NoError(t, err) // Clear any existing installation ID - redisClient.Del(ctx, outpostrcKey) + redisClient.Del(ctx, installationKey("")) config := telemetry.TelemetryConfig{Disabled: false} @@ -66,7 +66,7 @@ func TestGetInstallationConcurrency(t *testing.T) { for i := 0; i < numGoroutines; i++ { go func() { - id, err := getInstallation(ctx, redisClient, config) + id, err := getInstallation(ctx, redisClient, config, "") if err != nil { errorChan <- err return @@ -94,7 +94,41 @@ func TestGetInstallationConcurrency(t *testing.T) { } // Verify only one ID was created in Redis - storedID, err := redisClient.HGet(ctx, outpostrcKey, installationKey).Result() + storedID, err := redisClient.Get(ctx, installationKey("")).Result() require.NoError(t, err) assert.Equal(t, expectedID, storedID) } + +func TestGetInstallationWithDeploymentID(t *testing.T) { + ctx := context.Background() + redisConfig := testutil.CreateTestRedisConfig(t) + redisClient, err := redis.New(ctx, redisConfig) + require.NoError(t, err) + + config := telemetry.TelemetryConfig{Disabled: false} + + // Clear keys + redisClient.Del(ctx, installationKey("dp_001")) + redisClient.Del(ctx, installationKey("dp_002")) + + // Create installation IDs for two deployments + id1, err := getInstallation(ctx, redisClient, config, "dp_001") + require.NoError(t, err) + assert.NotEmpty(t, id1) + + id2, err := getInstallation(ctx, redisClient, config, "dp_002") + require.NoError(t, err) + assert.NotEmpty(t, id2) + + // They should be different (independent deployments) + assert.NotEqual(t, id1, id2, "Different deployments should have different installation IDs") + + // Verify keys are scoped + storedID1, err := redisClient.Get(ctx, "dp_001:outpost:installation_id").Result() + require.NoError(t, err) + assert.Equal(t, id1, storedID1) + + storedID2, err := redisClient.Get(ctx, "dp_002:outpost:installation_id").Result() + require.NoError(t, err) + assert.Equal(t, id2, storedID2) +} diff --git a/internal/app/redis_migration.go b/internal/app/redis_migration.go index 70d52782d..c4d0c1f53 100644 --- a/internal/app/redis_migration.go +++ b/internal/app/redis_migration.go @@ -74,7 +74,7 @@ func executeRedisMigrations(ctx context.Context, redisClient redis.Cmdable, logg client = &redisClientAdapter{Cmdable: redisClient} } - runner := migratorredis.NewRunner(client, logger) + runner := migratorredis.NewRunner(client, logger, deploymentID) // Register all migrations from the central registry // Pass deployment ID to scope migrations to this deployment's keys diff --git a/internal/migrator/migratorredis/runner.go b/internal/migrator/migratorredis/runner.go index 32443eb6e..2f46069df 100644 --- a/internal/migrator/migratorredis/runner.go +++ b/internal/migrator/migratorredis/runner.go @@ -13,30 +13,39 @@ import ( ) const ( - migrationLockKey = ".outpost:migration:lock" + migrationLockKey = ".outpost:migration_lock" lockTTL = time.Hour // Lock expires after 1 hour in case process dies ) // Runner handles automatic Redis migrations at startup type Runner struct { - client redis.Client - logger *logging.Logger - migrations []Migration - lock redislock.Lock + client redis.Client + logger *logging.Logger + migrations []Migration + lock redislock.Lock + deploymentID string } // NewRunner creates a new migration runner -func NewRunner(client redis.Client, logger *logging.Logger) *Runner { +func NewRunner(client redis.Client, logger *logging.Logger, deploymentID string) *Runner { return &Runner{ - client: client, - logger: logger, + client: client, + logger: logger, + deploymentID: deploymentID, lock: redislock.New(client, - redislock.WithKey(migrationLockKey), + redislock.WithKey(deploymentPrefix(deploymentID)+migrationLockKey), redislock.WithTTL(lockTTL), ), } } +func deploymentPrefix(deploymentID string) string { + if deploymentID == "" { + return "" + } + return deploymentID + ":" +} + // RegisterMigration adds a migration to the runner func (r *Runner) RegisterMigration(m Migration) { r.migrations = append(r.migrations, m) @@ -72,27 +81,40 @@ func (r *Runner) Run(ctx context.Context) error { // checkIfFreshInstallation checks if Redis has any existing Outpost data func (r *Runner) checkIfFreshInstallation(ctx context.Context) (bool, error) { + prefix := deploymentPrefix(r.deploymentID) + // Check for any "outpost:*" keys (current format) - outpostKeys, err := r.client.Keys(ctx, "outpost:*").Result() + // Use SCAN instead of KEYS to avoid blocking Redis on large keyspaces + hasKeys, err := hasAnyKeys(ctx, r.client, prefix+"outpost:*") if err != nil { return false, fmt.Errorf("failed to check outpost keys: %w", err) } - if len(outpostKeys) > 0 { + if hasKeys { return false, nil } - // Check for any "tenant:*" keys (old format) - tenantKeys, err := r.client.Keys(ctx, "tenant:*").Result() + // Check for any "tenant:*" keys (old format, or deployment-scoped tenant keys) + hasKeys, err = hasAnyKeys(ctx, r.client, prefix+"tenant:*") if err != nil { return false, fmt.Errorf("failed to check tenant keys: %w", err) } - if len(tenantKeys) > 0 { + if hasKeys { return false, nil } return true, nil } +// hasAnyKeys uses SCAN to check if at least one key matches the pattern +// without blocking Redis like KEYS does. +func hasAnyKeys(ctx context.Context, client redis.Client, pattern string) (bool, error) { + keys, _, err := client.Scan(ctx, 0, pattern, 1).Result() + if err != nil { + return false, err + } + return len(keys) > 0, nil +} + // handleFreshInstallation marks all migrations as applied for new installations func (r *Runner) handleFreshInstallation(ctx context.Context) error { r.logger.Info("fresh redis installation detected, marking migrations as applied", @@ -302,10 +324,13 @@ func (r *Runner) waitForInitialization(ctx context.Context) error { return fmt.Errorf("timeout waiting for redis initialization") } +func (r *Runner) migrationKey(name string) string { + return fmt.Sprintf("%soutpost:migration:%s", deploymentPrefix(r.deploymentID), name) +} + // isMigrationSatisfied checks if a migration has been satisfied (applied or not applicable) func (r *Runner) isMigrationSatisfied(ctx context.Context, name string) bool { - key := fmt.Sprintf("outpost:migration:%s", name) - val, err := r.client.HGet(ctx, key, "status").Result() + val, err := r.client.HGet(ctx, r.migrationKey(name), "status").Result() if err != nil { return false } @@ -314,8 +339,7 @@ func (r *Runner) isMigrationSatisfied(ctx context.Context, name string) bool { // setMigrationApplied marks a migration as applied func (r *Runner) setMigrationApplied(ctx context.Context, name string) error { - key := fmt.Sprintf("outpost:migration:%s", name) - return r.client.HSet(ctx, key, + return r.client.HSet(ctx, r.migrationKey(name), "status", "applied", "applied_at", time.Now().Format(time.RFC3339), ).Err() @@ -323,8 +347,7 @@ func (r *Runner) setMigrationApplied(ctx context.Context, name string) error { // setMigrationNotApplicable marks a migration as not applicable func (r *Runner) setMigrationNotApplicable(ctx context.Context, name, reason string) error { - key := fmt.Sprintf("outpost:migration:%s", name) - return r.client.HSet(ctx, key, + return r.client.HSet(ctx, r.migrationKey(name), "status", "not_applicable", "checked_at", time.Now().Format(time.RFC3339), "reason", reason, diff --git a/internal/migrator/migratorredis/runner_test.go b/internal/migrator/migratorredis/runner_test.go index 21abd0d7e..330ca524a 100644 --- a/internal/migrator/migratorredis/runner_test.go +++ b/internal/migrator/migratorredis/runner_test.go @@ -121,7 +121,7 @@ func setupTestRunner(t *testing.T) (*Runner, *miniredis.Miniredis, func()) { logger, err := logging.NewLogger(logging.WithLogLevel("error")) require.NoError(t, err) - runner := NewRunner(testClient, logger) + runner := NewRunner(testClient, logger, "") cleanup := func() { client.Close() @@ -413,6 +413,38 @@ func TestRunner_Run_IsApplicable(t *testing.T) { assert.Equal(t, "Not needed - using DEPLOYMENT_ID", reason) }) + t.Run("uses deployment-scoped keys when deploymentID is set", func(t *testing.T) { + mr := miniredis.RunT(t) + client := r.NewClient(&r.Options{Addr: mr.Addr()}) + testClient := &redisTestClient{Client: client} + defer client.Close() + + logger, err := logging.NewLogger(logging.WithLogLevel("error")) + require.NoError(t, err) + + runner := NewRunner(testClient, logger, "dp_001") + ctx := context.Background() + + // Simulate existing data with deployment prefix + mr.Set("dp_001:outpost:tenant:test123:tenant", "data") + + mig := newMockMigration("001_test", 1, true) + mig.applicable = false + mig.notApplicableReason = "Not needed" + runner.RegisterMigration(mig) + + err = runner.Run(ctx) + require.NoError(t, err) + + // Key should be prefixed with deployment ID + status := mr.HGet("dp_001:outpost:migration:001_test", "status") + assert.Equal(t, "not_applicable", status) + + // Non-prefixed key should NOT exist + unprefixedStatus := mr.HGet("outpost:migration:001_test", "status") + assert.Empty(t, unprefixedStatus) + }) + t.Run("skips non-applicable and runs applicable migrations", func(t *testing.T) { runner, mr, cleanup := setupTestRunner(t) defer cleanup() diff --git a/scripts/issue-680/README.md b/scripts/issue-680/README.md new file mode 100644 index 000000000..3641a6597 --- /dev/null +++ b/scripts/issue-680/README.md @@ -0,0 +1,59 @@ +# Issue #680: Scope Redis control plane keys by deployment ID + +## Problem + +Control plane keys (`outpostrc`, `.outpost:migration:lock`, `outpost:migration:*`) are not prefixed with `DEPLOYMENT_ID`, so multiple deployments sharing the same Dragonfly instance collide on these keys. + +## Files + +- `migrate.sh` — Production migration script for Dragonfly Cloud. Copies shared (unscoped) control plane keys to deployment-scoped versions. Dry run by default, `--apply` to execute, `--cleanup` to delete old shared keys after. +- `TEST_CASES.md` — Manual QA test cases for verifying the fix. + +## Production Runbook (Dragonfly Cloud) + +### Pre-flight + +```bash +# Verify .envrc is loaded (direnv) +echo $OUTPOST_PROD_DRAGONFLY_HOST + +# Verify connectivity +redis-cli -h "$OUTPOST_PROD_DRAGONFLY_HOST" -p "$OUTPOST_PROD_DRAGONFLY_PORT" \ + --user "$OUTPOST_PROD_DRAGONFLY_USER" --pass "$OUTPOST_PROD_DRAGONFLY_PASSWORD" \ + --tls --no-auth-warning PING +``` + +### Inspect current state + +```bash +# See what shared keys exist +redis-cli ... KEYS "outpostrc" +redis-cli ... KEYS "outpost:migration:*" +redis-cli ... KEYS ".outpost:migration:*" +``` + +### Run migration + +```bash +# 1. Dry run first +./migrate.sh + +# 2. Review output, then apply +./migrate.sh --apply + +# 3. Verify scoped keys exist for each deployment +redis-cli ... KEYS "tm_*:outpost:*" + +# 4. Deploy new Outpost version to all Railway services + +# 5. Verify deployments start cleanly (check Railway logs) + +# 6. Clean up old shared keys +./migrate.sh --apply --cleanup +``` + +### Rollback + +Before cleanup: new code reads scoped keys, old keys still exist. Rolling back Outpost version will use old unscoped keys untouched. + +After cleanup: old keys are gone. Would need to redeploy old version and let it re-initialize. diff --git a/scripts/issue-680/TEST_CASES.md b/scripts/issue-680/TEST_CASES.md new file mode 100644 index 000000000..1129cd4b5 --- /dev/null +++ b/scripts/issue-680/TEST_CASES.md @@ -0,0 +1,274 @@ +# Test Cases: #680 Control Plane Key Scoping + +## Setup + +Uses the standard dev environment (see [contributing/getting-started.md](../../contributing/getting-started.md)). + +```bash +# Start deps (Redis + RabbitMQ + Postgres) +make up/deps +``` + +For tests that need old vs new binaries: + +```bash +# Build from main (before fix) +git checkout main +go build -o bin/outpost-old ./cmd/outpost + +# Build from feature branch +git checkout +go build -o bin/outpost-new ./cmd/outpost +go build -o bin/outpost-migrate-redis-new ./cmd/outpost-migrate-redis +``` + +Redis verification commands below are raw Redis commands — run them from `redis-cli` or any Redis client connected to `localhost:26379`. + +--- + +## TC1: Reproduce bug — two deployments share control plane keys + +``` +FLUSHALL +``` + +Start two deployments using Docker Compose env overrides (or run manually): + +```bash +# Terminal 1 — deployment A +DEPLOYMENT_ID=dp_A \ +REDIS_HOST=host.docker.internal REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3333 \ + go run ./cmd/outpost --config .outpost.yaml + +# Terminal 2 — deployment B +DEPLOYMENT_ID=dp_B \ +REDIS_HOST=host.docker.internal REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3334 \ + go run ./cmd/outpost --config .outpost.yaml +``` + +> Note: If running outside Docker, use `localhost` for REDIS_HOST. The `.outpost.yaml` redis config is overridden by env vars. + +Verify shared keys (the bug): + +``` +KEYS *outpost* +# outpostrc ← shared, NOT dp_A: or dp_B: prefixed +# outpost:migration:001_hash_tags ← shared +# outpost:migration:002_timestamps ← shared +# outpost:migration:003_entity ← shared + +HGETALL outpostrc +# installation → inst_xxx ← one value shared by both deployments +``` + +Stop both. + +--- + +## TC2: New version — scoped keys on fresh Redis + +``` +FLUSHALL +``` + +Start two deployments with the new binary: + +```bash +# Terminal 1 +DEPLOYMENT_ID=dp_A \ +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3333 \ + ./bin/outpost-new + +# Terminal 2 +DEPLOYMENT_ID=dp_B \ +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3334 \ + ./bin/outpost-new +``` + +Verify: + +``` +KEYS *outpost* +# dp_A:outpost:installation_id +# dp_A:outpost:migration:001_hash_tags +# dp_A:outpost:migration:002_timestamps +# dp_A:outpost:migration:003_entity +# dp_B:outpost:installation_id +# dp_B:outpost:migration:001_hash_tags +# dp_B:outpost:migration:002_timestamps +# dp_B:outpost:migration:003_entity + +# Installation IDs are different +GET dp_A:outpost:installation_id # → inst_aaa +GET dp_B:outpost:installation_id # → inst_bbb (different) + +# Migration status is per-deployment +HGETALL dp_A:outpost:migration:001_hash_tags +# status → not_applicable (or applied) + +# No unscoped keys +EXISTS outpostrc # → 0 +KEYS outpost:migration:* # → (empty) +``` + +Stop both. + +--- + +## TC3: No DEPLOYMENT_ID (single-tenant) still works + +``` +FLUSHALL +``` + +Start without DEPLOYMENT_ID (default dev flow): + +```bash +make up/outpost +# or: +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3333 \ + ./bin/outpost-new +``` + +Verify: + +``` +KEYS *outpost* +# outpost:installation_id ← no prefix, new key name +# outpost:migration:001_hash_tags ← no prefix +# outpost:migration:002_timestamps +# outpost:migration:003_entity + +GET outpost:installation_id +# → inst_xxx (not empty) + +HGET outpost:migration:001_hash_tags status +# → applied or not_applicable +``` + +--- + +## TC4: Migration CLI respects scoping + +``` +FLUSHALL +``` + +```bash +# Init with deployment ID +DEPLOYMENT_ID=dp_A REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ + ./bin/outpost-migrate-redis-new init +``` + +``` +KEYS * +# dp_A:outpost:migration:001_hash_tags +# dp_A:outpost:migration:002_timestamps +# dp_A:outpost:migration:003_entity +``` + +```bash +# Init without — separate namespace +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ + ./bin/outpost-migrate-redis-new init +``` + +``` +KEYS * +# dp_A:outpost:migration:* ← dp_A's keys +# outpost:migration:* ← unscoped keys (separate) +``` + +--- + +## TC5: Upgrade path — old data → migrate script → new version + +``` +FLUSHALL +``` + +```bash +# 1. Run old version, let it write shared keys +DEPLOYMENT_ID=dp_A \ +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3333 \ + ./bin/outpost-old + +# Create a tenant so there's data +curl -X PUT http://localhost:3333/api/v1/tenants/test-tenant \ + -H "Authorization: Bearer apikey" +# Stop the server +``` + +``` +# 2. Verify old shared keys exist +KEYS * +# outpostrc, outpost:migration:*, dp_A:tenant:{test-tenant}:tenant, etc. +``` + +```bash +# 3. Run the migration script (dry run, then apply) +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ + ./scripts/issue-680/migrate.sh + +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ + ./scripts/issue-680/migrate.sh --apply +``` + +``` +# 4. Verify scoped keys were created +KEYS dp_A:outpost:* +# dp_A:outpost:installation_id +# dp_A:outpost:migration:001_hash_tags +# ... +``` + +```bash +# 5. Start new version +DEPLOYMENT_ID=dp_A \ +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3333 \ + ./bin/outpost-new +# Should start cleanly — no migration errors, no re-initialization + +# 6. Verify tenant data still accessible +curl http://localhost:3333/api/v1/tenants \ + -H "Authorization: Bearer apikey" +# → should list test-tenant +``` + +--- + +## TC6: Concurrent startup — same deployment, two services + +Tests atomic installation ID generation under the new key scheme. + +``` +FLUSHALL +``` + +```bash +# Start API and delivery for same deployment simultaneously +DEPLOYMENT_ID=dp_A \ +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=api PORT=3333 \ + ./bin/outpost-new & + +DEPLOYMENT_ID=dp_A \ +REDIS_HOST=localhost REDIS_PORT=26379 REDIS_PASSWORD=password \ +SERVICE=delivery \ + ./bin/outpost-new & + +wait +``` + +``` +# Both should share the same installation ID (SetNX ensures atomicity) +GET dp_A:outpost:installation_id +# → exactly one value +``` diff --git a/scripts/issue-680/migrate.sh b/scripts/issue-680/migrate.sh new file mode 100755 index 000000000..32b0d05b9 --- /dev/null +++ b/scripts/issue-680/migrate.sh @@ -0,0 +1,234 @@ +#!/usr/bin/env bash +# +# Migration script for issue #680: Scope Redis control plane keys by deployment ID. +# +# This script copies the shared (unscoped) control plane keys to deployment-scoped +# versions for each deployment that exists in Redis. +# +# What it does: +# 1. Discovers all deployment IDs by scanning for {id}:tenant:* key patterns +# 2. For each deployment ID: +# a. Copies outpostrc hash → {id}:outpost:installation_id (string key) +# b. Copies outpost:migration:{name} hashes → {id}:outpost:migration:{name} +# c. Copies outpost:migration:{name}:run:* hashes → {id}:outpost:migration:{name}:run:* +# 3. Optionally deletes the old shared keys (with --cleanup flag) +# +# Usage: +# # Dry run (default) - shows what would be done +# ./migrate.sh +# +# # Actually run the migration +# ./migrate.sh --apply +# +# # Apply + clean up old shared keys after +# ./migrate.sh --apply --cleanup +# +# Connection env vars: +# REDIS_HOST (required) +# REDIS_PORT (default: 6379) +# REDIS_USER (optional) +# REDIS_PASSWORD (optional) +# REDIS_TLS (set to "1" to enable TLS) +# + +set -euo pipefail + +# --- Config --- +DRY_RUN=true +CLEANUP=false + +for arg in "$@"; do + case "$arg" in + --apply) DRY_RUN=false ;; + --cleanup) CLEANUP=true ;; + --help|-h) + echo "Usage: $0 [--apply] [--cleanup]" + echo " --apply Actually run the migration (default: dry run)" + echo " --cleanup Delete old shared keys after migration" + exit 0 + ;; + esac +done + +# --- Connection --- +REDIS_HOST="${REDIS_HOST:?Set REDIS_HOST}" +REDIS_PORT="${REDIS_PORT:-6379}" +REDIS_USER="${REDIS_USER:-}" +REDIS_PASS="${REDIS_PASSWORD:-}" +REDIS_TLS="${REDIS_TLS:-}" + +rcli() { + local args=(-h "$REDIS_HOST" -p "$REDIS_PORT" --no-auth-warning) + [[ -n "$REDIS_USER" ]] && args+=(--user "$REDIS_USER") + [[ -n "$REDIS_PASS" ]] && args+=(--pass "$REDIS_PASS") + [[ "$REDIS_TLS" == "1" ]] && args+=(--tls) + redis-cli "${args[@]}" "$@" +} + +# --- Helpers --- +log() { echo "[INFO] $*"; } +warn() { echo "[WARN] $*" >&2; } +dry() { if $DRY_RUN; then echo "[DRY] $*"; else echo "[EXEC] $*"; fi; } + +# --- Step 1: Discover deployment IDs --- +log "Discovering deployment IDs..." + +# Scan for keys matching *:tenant:* and extract unique prefixes +DEPLOYMENT_IDS=() +cursor=0 +while true; do + result=$(rcli SCAN "$cursor" MATCH "*:tenant:*:tenant" COUNT 1000) + cursor=$(echo "$result" | head -1) + keys=$(echo "$result" | tail -n +2) + + for key in $keys; do + # Extract deployment ID (everything before first ":tenant:") + id="${key%%:tenant:*}" + if [[ -n "$id" && "$id" != "$key" ]]; then + DEPLOYMENT_IDS+=("$id") + fi + done + + if [[ "$cursor" == "0" ]]; then + break + fi +done + +# Deduplicate +DEPLOYMENT_IDS=($(printf '%s\n' "${DEPLOYMENT_IDS[@]}" | sort -u)) + +if [[ ${#DEPLOYMENT_IDS[@]} -eq 0 ]]; then + warn "No deployment IDs found. Nothing to migrate." + exit 0 +fi + +log "Found ${#DEPLOYMENT_IDS[@]} deployment(s): ${DEPLOYMENT_IDS[*]}" + +# --- Step 2: Check current state of shared keys --- +log "" +log "Checking shared control plane keys..." + +# Check outpostrc +INSTALLATION_ID=$(rcli HGET outpostrc installation 2>/dev/null || echo "") +if [[ -n "$INSTALLATION_ID" ]]; then + log " outpostrc -> installation = $INSTALLATION_ID" +else + log " outpostrc -> (not found, each deployment will generate its own on startup)" +fi + +# Find all outpost:migration:* keys (excluding run history) +MIGRATION_KEYS=() +cursor=0 +while true; do + result=$(rcli SCAN "$cursor" MATCH "outpost:migration:*" COUNT 100) + cursor=$(echo "$result" | head -1) + keys=$(echo "$result" | tail -n +2) + + for key in $keys; do + MIGRATION_KEYS+=("$key") + done + + if [[ "$cursor" == "0" ]]; then + break + fi +done + +log " Found ${#MIGRATION_KEYS[@]} migration key(s)" +for key in "${MIGRATION_KEYS[@]}"; do + status=$(rcli HGET "$key" status 2>/dev/null || echo "(no status)") + log " $key -> $status" +done + +# Check lock key +LOCK_EXISTS=$(rcli EXISTS ".outpost:migration:lock" 2>/dev/null || echo "0") +if [[ "$LOCK_EXISTS" == "1" ]]; then + warn " .outpost:migration:lock exists! A migration may be running." +fi + +# --- Step 3: Migrate for each deployment --- +log "" +log "=== Starting migration ===" + +for DEPLOY_ID in "${DEPLOYMENT_IDS[@]}"; do + log "" + log "--- Deployment: $DEPLOY_ID ---" + + # 3a. Installation ID + if [[ -n "$INSTALLATION_ID" ]]; then + target_key="${DEPLOY_ID}:outpost:installation_id" + existing=$(rcli GET "$target_key" 2>/dev/null || echo "") + if [[ -n "$existing" ]]; then + log " $target_key already exists ($existing), skipping" + else + dry " SET $target_key $INSTALLATION_ID" + if ! $DRY_RUN; then + rcli SET "$target_key" "$INSTALLATION_ID" > /dev/null + fi + fi + fi + + # 3b. Migration status keys + for old_key in "${MIGRATION_KEYS[@]}"; do + # Derive the new key: outpost:migration:X -> {id}:outpost:migration:X + new_key="${DEPLOY_ID}:${old_key}" + + existing=$(rcli EXISTS "$new_key" 2>/dev/null || echo "0") + if [[ "$existing" == "1" ]]; then + log " $new_key already exists, skipping" + continue + fi + + # Copy the hash field by field + dry " COPY $old_key -> $new_key" + if ! $DRY_RUN; then + # Read all field-value pairs and write them to the new key + hset_args=() + while IFS= read -r field && IFS= read -r value; do + hset_args+=("$field" "$value") + done < <(rcli HGETALL "$old_key" 2>/dev/null) + + if [[ ${#hset_args[@]} -gt 0 ]]; then + rcli HSET "$new_key" "${hset_args[@]}" > /dev/null + else + warn " $old_key has no fields, skipping" + fi + + # Verify the copy + if [[ $(rcli EXISTS "$new_key" 2>/dev/null) != "1" ]]; then + warn " FAILED to copy $old_key -> $new_key" + fi + fi + done +done + +# --- Step 4: Cleanup (optional) --- +if $CLEANUP && ! $DRY_RUN; then + log "" + log "=== Cleaning up old shared keys ===" + + if [[ -n "$INSTALLATION_ID" ]]; then + log " DEL outpostrc" + rcli DEL outpostrc > /dev/null + fi + + for key in "${MIGRATION_KEYS[@]}"; do + log " DEL $key" + rcli DEL "$key" > /dev/null + done + + if [[ "$LOCK_EXISTS" == "1" ]]; then + log " DEL .outpost:migration:lock" + rcli DEL ".outpost:migration:lock" > /dev/null + fi +elif $CLEANUP && $DRY_RUN; then + log "" + log "=== Cleanup (dry run) ===" + log " Would delete: outpostrc, .outpost:migration:lock, and ${#MIGRATION_KEYS[@]} migration key(s)" +fi + +log "" +if $DRY_RUN; then + log "Dry run complete. Re-run with --apply to execute." +else + log "Migration complete." +fi