Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 61 additions & 28 deletions cmd/iron-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,14 @@ func main() {
// Managed mode is determined by the presence of a control plane token.
managed := proxyToken != ""

if cfg.Management.Listen != "" {
if managed {
fmt.Fprintln(os.Stderr, "error: management.listen cannot be used with managed mode; the control plane is the source of truth")
os.Exit(1)
}
if *configPath == "" {
fmt.Fprintln(os.Stderr, "error: management.listen requires --config; /v1/reload has no file to re-read")
os.Exit(1)
}
// Standalone mode serves /v1/reload, which re-reads the config file.
// Managed mode serves /v1/status and /v1/sync instead: the control plane
// stays the source of truth for config, while the sandbox control plane
// can verify which principal's config the proxy has actually applied
// before routing traffic through it.
if cfg.Management.Listen != "" && !managed && *configPath == "" {
fmt.Fprintln(os.Stderr, "error: management.listen requires --config in standalone mode; /v1/reload has no file to re-read")
os.Exit(1)
}

// Both modes produce a pipeline holder. Managed mode populates the
Expand Down Expand Up @@ -121,10 +120,11 @@ func main() {
var holder *transform.PipelineHolder
var mcpHolder *mcp.PolicyHolder
var otelCfg iotel.ExportConfig
var poller *controlplane.Poller

if managed {
var ingestToken string
holder, mcpHolder, ingestToken, pgListener = initManaged(ctx, cfg, bodyLimits, errc, proxyToken, pgManager, localPgListener, logger)
holder, mcpHolder, ingestToken, pgListener, poller = initManaged(ctx, cfg, bodyLimits, errc, proxyToken, pgManager, localPgListener, logger)
if ingestToken != "" {
otelCfg.DefaultEndpoint = "https://ingest.iron.sh/v1/logs"
otelCfg.DefaultHeaders = map[string]string{
Expand Down Expand Up @@ -222,21 +222,32 @@ func main() {
Logger: logger,
UpstreamResponseHeaderTimeout: time.Duration(cfg.Proxy.UpstreamResponseHeaderTimeout),
UpstreamProxy: cfg.Proxy.UpstreamProxy.ProxyFunc(),
// Managed proxies fail closed until the first control-plane config
// has been applied; an un-synced pipeline would otherwise pass
// requests through with placeholder credentials intact.
Ready: managedReady(poller),
})

// Initialize metrics server.
metricsServer := metrics.New(cfg.Metrics.Listen, logger)

// Initialize management server (standalone mode only; guarded above).
// Initialize management server: /v1/reload in standalone mode,
// /v1/status and /v1/sync in managed mode.
var mgmtServer *management.Server
if cfg.Management.Listen != "" {
mgmtServer = management.New(management.Options{
mgmtOpts := management.Options{
Addr: cfg.Management.Listen,
APIKey: os.Getenv(cfg.Management.APIKeyEnv),
Reload: newReloadFunc(*configPath, holder, mcpHolder, pgManager, bodyLimits, logger),
Logger: logger,
Ctx: ctx,
})
}
if managed {
mgmtOpts.Status = func() any { return poller.Status() }
mgmtOpts.SyncNow = poller.Poke
} else {
mgmtOpts.Reload = newReloadFunc(*configPath, holder, mcpHolder, pgManager, bodyLimits, logger)
}
mgmtServer = management.New(mgmtOpts)
}

// Start services.
Expand Down Expand Up @@ -321,7 +332,7 @@ func main() {
//
// Initial MCP policy preference: control-plane-supplied mcp block first, then
// fall back to cfg.MCP from the YAML if the sync did not include one.
func initManaged(ctx context.Context, cfg *config.Config, bodyLimits transform.BodyLimits, errc chan<- error, proxyToken string, pgManager *postgres.Manager, localPgListener *postgres.Listener, logger *slog.Logger) (*transform.PipelineHolder, *mcp.PolicyHolder, string, *postgres.Listener) {
func initManaged(ctx context.Context, cfg *config.Config, bodyLimits transform.BodyLimits, errc chan<- error, proxyToken string, pgManager *postgres.Manager, localPgListener *postgres.Listener, logger *slog.Logger) (*transform.PipelineHolder, *mcp.PolicyHolder, string, *postgres.Listener, *controlplane.Poller) {
cpURL := envOrDefault("IRON_CONTROL_PLANE_URL", "https://api.iron.sh")
logger.Info("starting in managed mode", slog.String("control_plane_url", cpURL))

Expand Down Expand Up @@ -391,22 +402,41 @@ func initManaged(ctx context.Context, cfg *config.Config, bodyLimits transform.B
// Start config poller.
poller := controlplane.NewPoller(client, configHash, func(u controlplane.SyncUpdate) error {
if u.Rules != nil || u.Secrets != nil || u.Transforms != nil {
applyPipelineSync(holder, bodyLimits, logger, u.Rules, u.Secrets, u.Transforms)
if err := applyPipelineSync(holder, bodyLimits, logger, u.Rules, u.Secrets, u.Transforms); err != nil {
return err
}
}
if u.MCP != nil {
applyMCPSync(mcpHolder, logger, u.MCP)
if err := applyMCPSync(mcpHolder, logger, u.MCP); err != nil {
return err
}
}
if u.Postgres != nil {
applyPostgresSync(ctx, pgManager, localPgListener, os.Getenv, logger, u.Postgres)
if err := applyPostgresSync(ctx, pgManager, localPgListener, os.Getenv, logger, u.Postgres); err != nil {
return err
}
}
return nil
}, logger)

// Seed the poller's status from the startup sync so /v1/status (and the
// fail-closed gate) reflect it before the polling loop's first pass.
poller.SeedStatus(syncResp)

go func() {
errc <- poller.Run(ctx)
}()

return holder, mcpHolder, ingestToken, pgListener
return holder, mcpHolder, ingestToken, pgListener, poller
}

// managedReady gates the proxy on the first applied control-plane config.
// A nil poller (standalone mode) means always ready.
func managedReady(poller *controlplane.Poller) func() bool {
if poller == nil {
return nil
}
return func() bool { return poller.Status().SyncedOnce }
}

// buildInitialMCPHolder picks the initial MCP policy source: a control-plane
Expand Down Expand Up @@ -452,48 +482,50 @@ func initStandalone(cfg *config.Config, bodyLimits transform.BodyLimits, logger
// swaps it in. If parsing or pipeline construction fails, the existing pipeline
// is preserved and an error is logged: an invalid push from the control plane
// must not take down the proxy.
func applyPipelineSync(holder *transform.PipelineHolder, bodyLimits transform.BodyLimits, logger *slog.Logger, rules, secrets, transforms json.RawMessage) {
func applyPipelineSync(holder *transform.PipelineHolder, bodyLimits transform.BodyLimits, logger *slog.Logger, rules, secrets, transforms json.RawMessage) error {
newTransforms, err := config.TransformsFromSync(rules, secrets, transforms)
if err != nil {
logger.Error("rejecting invalid pipeline config from sync, keeping current pipeline", slog.String("error", err.Error()))
return
return fmt.Errorf("pipeline sync: %w", err)
}
newPipeline, err := buildPipeline(newTransforms, bodyLimits, logger)
if err != nil {
logger.Error("rejecting invalid pipeline config from sync, keeping current pipeline", slog.String("error", err.Error()))
return
return fmt.Errorf("pipeline sync: %w", err)
}
newPipeline.SetAuditFunc(holder.Load().AuditFunc())
holder.Store(newPipeline)
logger.Info("pipeline reloaded", slog.String("transforms", newPipeline.Names()))
return nil
}

// applyMCPSync compiles a new MCP policy from a sync payload and atomically
// swaps it in. Parse or compile errors are logged and the prior policy is
// preserved: an invalid push from the control plane must not take down a
// running proxy. An empty/null mcp block is interpreted by the caller as
// "no update" and is not delivered here.
func applyMCPSync(holder *mcp.PolicyHolder, logger *slog.Logger, raw json.RawMessage) {
func applyMCPSync(holder *mcp.PolicyHolder, logger *slog.Logger, raw json.RawMessage) error {
node, present, err := config.MCPFromSync(raw)
if err != nil {
logger.Error("rejecting invalid mcp policy from sync, keeping current policy", slog.String("error", err.Error()))
return
return fmt.Errorf("mcp sync: %w", err)
}
if !present {
// Should not happen — caller filters absent/null — but treat as no-op.
return
return nil
}
policy, err := mcp.LoadFromNode(node)
if err != nil {
logger.Error("rejecting invalid mcp policy from sync, keeping current policy", slog.String("error", err.Error()))
return
return fmt.Errorf("mcp sync: %w", err)
}
holder.Store(policy)
if policy == nil {
logger.Info("mcp policy cleared by control plane")
} else {
logger.Info("mcp policy reloaded")
}
return nil
}

// Environment variables that configure the managed postgres listener when the
Expand Down Expand Up @@ -585,13 +617,14 @@ func postgresListenerFromSync(local *postgres.Listener, getenv func(string) stri
// applyPostgresSync rebuilds the postgres listener from a sync payload and
// hot-reloads the manager. An invalid payload is logged and the running
// listener is preserved.
func applyPostgresSync(ctx context.Context, mgr *postgres.Manager, local *postgres.Listener, getenv func(string) string, logger *slog.Logger, raw json.RawMessage) {
func applyPostgresSync(ctx context.Context, mgr *postgres.Manager, local *postgres.Listener, getenv func(string) string, logger *slog.Logger, raw json.RawMessage) error {
listener, ok := postgresListenerFromSync(local, getenv, logger, raw)
if !ok {
return
return fmt.Errorf("postgres sync: invalid postgres config")
}
mgr.Reload(ctx, listener)
logger.Info("postgres listener reloaded from sync", slog.Bool("running", listener != nil))
return nil
}

// newReloadFunc returns a management.ReloadFunc that re-reads the YAML config
Expand Down
10 changes: 5 additions & 5 deletions cmd/iron-proxy/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestApplyPostgresSync_ReloadsListener(t *testing.T) {

raw := json.RawMessage(`[{"id":"pgs_1","foreign_id":"pg-analytics","database":"analytics","dsn":{"type":"env","var":"PG_DSN"}}]`)

applyPostgresSync(context.Background(), mgr, nil, mapEnv(pgListenerEnv()), discardLogger(), raw)
require.NoError(t, applyPostgresSync(context.Background(), mgr, nil, mapEnv(pgListenerEnv()), discardLogger(), raw))
require.True(t, mgr.Running())
}

Expand All @@ -187,7 +187,7 @@ func TestApplyPipelineSync_ValidConfig_Swaps(t *testing.T) {
logger := slog.New(slog.NewTextHandler(logBuf, nil))

rules := json.RawMessage(`[{"host":"example.com","methods":["GET"],"paths":["/api/*"]}]`)
applyPipelineSync(holder, transform.BodyLimits{}, logger, rules, nil, nil)
require.NoError(t, applyPipelineSync(holder, transform.BodyLimits{}, logger, rules, nil, nil))

require.NotSame(t, original, holder.Load(), "pipeline should have been swapped")
require.Equal(t, "allowlist", holder.Load().Names())
Expand All @@ -201,7 +201,7 @@ func TestApplyPipelineSync_InvalidJSON_KeepsExistingPipeline(t *testing.T) {
logBuf := &bytes.Buffer{}
logger := slog.New(slog.NewTextHandler(logBuf, nil))

applyPipelineSync(holder, transform.BodyLimits{}, logger, json.RawMessage(`{not json`), nil, nil)
require.Error(t, applyPipelineSync(holder, transform.BodyLimits{}, logger, json.RawMessage(`{not json`), nil, nil))

require.Same(t, original, holder.Load(), "pipeline must not be swapped on invalid config")
require.Contains(t, logBuf.String(), "rejecting invalid pipeline config")
Expand All @@ -217,7 +217,7 @@ func TestApplyPipelineSync_InvalidRule_KeepsExistingPipeline(t *testing.T) {

// host and cidr are mutually exclusive — rule construction fails.
rules := json.RawMessage(`[{"host":"example.com","cidr":"10.0.0.0/8"}]`)
applyPipelineSync(holder, transform.BodyLimits{}, logger, rules, nil, nil)
require.Error(t, applyPipelineSync(holder, transform.BodyLimits{}, logger, rules, nil, nil))

require.Same(t, original, holder.Load(), "pipeline must not be swapped when transform construction fails")
require.Contains(t, logBuf.String(), "rejecting invalid pipeline config")
Expand All @@ -233,7 +233,7 @@ func TestApplyPipelineSync_PreservesAuditFunc(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))

rules := json.RawMessage(`[{"host":"example.com"}]`)
applyPipelineSync(holder, transform.BodyLimits{}, logger, rules, nil, nil)
require.NoError(t, applyPipelineSync(holder, transform.BodyLimits{}, logger, rules, nil, nil))

holder.Load().EmitAudit(nil)
require.True(t, called, "audit func should be carried over to the new pipeline")
Expand Down
3 changes: 3 additions & 0 deletions internal/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func applyEnvOverrides(cfg *Config) error {
if v := os.Getenv("IRON_METRICS_LISTEN"); v != "" {
cfg.Metrics.Listen = v
}
if v := os.Getenv("IRON_MANAGEMENT_LISTEN"); v != "" {
cfg.Management.Listen = v
}
if v := os.Getenv("IRON_LOG_LEVEL"); v != "" {
cfg.Log.Level = v
}
Expand Down
5 changes: 5 additions & 0 deletions internal/controlplane/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ type SyncResponse struct {
MCP json.RawMessage `json:"mcp"`
Postgres json.RawMessage `json:"postgres"`
IngestToken string `json:"ingest_token"`
// Status and PrincipalID describe the proxy's control-plane assignment as
// of this sync. The control plane includes them only on responses that
// carry a config payload; hash-match responses leave them empty.
Status string `json:"status"`
PrincipalID string `json:"principal_id"`
}

// Client talks to the iron.sh control plane REST API. Requests are
Expand Down
69 changes: 67 additions & 2 deletions internal/controlplane/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"log/slog"
"math/rand/v2"
"sync"
"time"
)

Expand All @@ -23,12 +24,28 @@ type SyncUpdate struct {
Postgres json.RawMessage
}

// Status is a snapshot of the poller's applied control-plane state. The
// management API serves it so an operator (or the sandbox control plane)
// can verify which principal's config this proxy is actually enforcing
// before routing traffic through it.
type Status struct {
ConfigHash string `json:"config_hash"`
PrincipalID string `json:"principal_id"`
PrincipalStatus string `json:"principal_status"`
SyncedOnce bool `json:"synced_once"`
LastSyncAt time.Time `json:"last_sync_at"`
}

// Poller periodically calls Sync and applies config updates.
type Poller struct {
client *Client
configHash string
onUpdate func(SyncUpdate) error
logger *slog.Logger

mu sync.RWMutex
status Status
poke chan struct{}
}

// NewPoller creates a new sync poller.
Expand All @@ -38,12 +55,56 @@ func NewPoller(client *Client, initialConfigHash string, onUpdate func(SyncUpdat
configHash: initialConfigHash,
onUpdate: onUpdate,
logger: logger,
poke: make(chan struct{}, 1),
}
}

// Poke requests an immediate out-of-band sync. It never blocks: at most one
// poke is queued, and a poke arriving while a sync is in flight coalesces
// into the next loop iteration.
func (p *Poller) Poke() {
select {
case p.poke <- struct{}{}:
default:
}
}

// Status returns a snapshot of the applied control-plane state.
func (p *Poller) Status() Status {
p.mu.RLock()
defer p.mu.RUnlock()
return p.status
}

// SeedStatus records the result of a sync performed outside the poller (the
// startup sync in managed mode) so Status reflects it before Run's first
// iteration.
func (p *Poller) SeedStatus(resp *SyncResponse) {
if resp == nil {
return
}
p.recordSync(resp)
}

func (p *Poller) recordSync(resp *SyncResponse) {
p.mu.Lock()
defer p.mu.Unlock()
p.status.ConfigHash = resp.ConfigHash
p.status.SyncedOnce = true
p.status.LastSyncAt = time.Now().UTC()
// Hash-match responses omit the assignment fields; keep the last known
// values so Status stays meaningful between config changes.
if resp.Status != "" {
p.status.PrincipalStatus = resp.Status
}
if resp.PrincipalID != "" {
p.status.PrincipalID = resp.PrincipalID
}
}

// Run starts the polling loop. It performs an initial sync immediately, then
// polls on PollInterval with ±10% jitter. Returns when ctx is canceled or
// a revocation error is received.
// polls on PollInterval with ±10% jitter; a Poke wakes it early. Returns when
// ctx is canceled or a revocation error is received.
func (p *Poller) Run(ctx context.Context) error {
if err := p.sync(ctx); err != nil {
if isRevocationError(err) {
Expand All @@ -61,6 +122,8 @@ func (p *Poller) Run(ctx context.Context) error {
timer.Stop()
return nil
case <-timer.C:
case <-p.poke:
timer.Stop()
}

if err := p.sync(ctx); err != nil {
Expand Down Expand Up @@ -103,11 +166,13 @@ func (p *Poller) sync(ctx context.Context) error {
Postgres: resp.Postgres,
}); err != nil {
p.logger.Error("applying config update", slog.String("error", err.Error()))
return err
}
}
}

p.configHash = resp.ConfigHash
p.recordSync(resp)
return nil
}

Expand Down
Loading
Loading