Skip to content

Commit 6052ff6

Browse files
committed
fix: bound queries and add DB retention pruning
1 parent c6aec66 commit 6052ff6

File tree

7 files changed

+328
-63
lines changed

7 files changed

+328
-63
lines changed

internal/agent/api_integrations_ingest_agent.go

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,32 @@ import (
1919
const (
2020
apiIntegrationIngestIntervalDefault = 5 * time.Second
2121
apiIntegrationIngestMaxReadBytes = 256 * 1024
22+
apiIntegrationPruneIntervalDefault = time.Hour
2223
)
2324

2425
// APIIntegrationsIngestAgent tails normalized JSONL API integration usage files and stores the events.
2526
type APIIntegrationsIngestAgent struct {
26-
store *store.Store
27-
dir string
28-
interval time.Duration
29-
logger *slog.Logger
27+
store *store.Store
28+
dir string
29+
interval time.Duration
30+
retention time.Duration
31+
pruneInterval time.Duration
32+
lastPrune time.Time
33+
logger *slog.Logger
3034
}
3135

3236
// NewAPIIntegrationsIngestAgent creates a new API integrations file ingester.
33-
func NewAPIIntegrationsIngestAgent(store *store.Store, dir string, logger *slog.Logger) *APIIntegrationsIngestAgent {
37+
func NewAPIIntegrationsIngestAgent(store *store.Store, dir string, retention time.Duration, logger *slog.Logger) *APIIntegrationsIngestAgent {
3438
if logger == nil {
3539
logger = slog.Default()
3640
}
3741
return &APIIntegrationsIngestAgent{
38-
store: store,
39-
dir: dir,
40-
interval: apiIntegrationIngestIntervalDefault,
41-
logger: logger,
42+
store: store,
43+
dir: dir,
44+
interval: apiIntegrationIngestIntervalDefault,
45+
retention: retention,
46+
pruneInterval: apiIntegrationPruneIntervalDefault,
47+
logger: logger,
4248
}
4349
}
4450

@@ -87,6 +93,10 @@ func (a *APIIntegrationsIngestAgent) scan() {
8793
a.logger.Error("API integrations ingester scan failed", "path", path, "error", err)
8894
}
8995
}
96+
97+
if err := a.pruneExpiredUsageEvents(); err != nil {
98+
a.logger.Error("API integrations ingester prune failed", "error", err)
99+
}
90100
}
91101

92102
func (a *APIIntegrationsIngestAgent) scanFile(path string) error {
@@ -189,3 +199,25 @@ func (a *APIIntegrationsIngestAgent) recordInvalidLine(path, line string, err er
189199
a.logger.Warn("Failed to create API integrations ingest alert", "path", path, "error", createErr)
190200
}
191201
}
202+
203+
func (a *APIIntegrationsIngestAgent) pruneExpiredUsageEvents() error {
204+
if a.store == nil || a.retention <= 0 {
205+
return nil
206+
}
207+
208+
now := time.Now().UTC()
209+
if !a.lastPrune.IsZero() && now.Sub(a.lastPrune) < a.pruneInterval {
210+
return nil
211+
}
212+
213+
cutoff := now.Add(-a.retention)
214+
deleted, err := a.store.DeleteAPIIntegrationUsageEventsOlderThan(cutoff)
215+
if err != nil {
216+
return err
217+
}
218+
a.lastPrune = now
219+
if deleted > 0 {
220+
a.logger.Info("API integrations usage retention pruned events", "deleted", deleted, "cutoff", cutoff.Format(time.RFC3339))
221+
}
222+
return nil
223+
}

internal/agent/api_integrations_ingest_agent_test.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99
"time"
1010

11+
apiintegrations "github.com/onllm-dev/onwatch/v2/internal/api_integrations"
1112
"github.com/onllm-dev/onwatch/v2/internal/store"
1213
)
1314

@@ -24,7 +25,7 @@ func TestAPIIntegrationsIngestAgent_ScanFile_PartialLineAndCompletion(t *testing
2425
t.Fatalf("WriteFile: %v", err)
2526
}
2627

27-
ag := NewAPIIntegrationsIngestAgent(st, dir, slog.Default())
28+
ag := NewAPIIntegrationsIngestAgent(st, dir, 0, slog.Default())
2829
if err := ag.scanFile(path); err != nil {
2930
t.Fatalf("scanFile(1): %v", err)
3031
}
@@ -73,7 +74,7 @@ func TestAPIIntegrationsIngestAgent_ScanFile_InvalidLineCreatesAlert(t *testing.
7374
t.Fatalf("WriteFile: %v", err)
7475
}
7576

76-
ag := NewAPIIntegrationsIngestAgent(st, dir, slog.Default())
77+
ag := NewAPIIntegrationsIngestAgent(st, dir, 0, slog.Default())
7778
if err := ag.scanFile(path); err != nil {
7879
t.Fatalf("scanFile: %v", err)
7980
}
@@ -108,7 +109,7 @@ func TestAPIIntegrationsIngestAgent_ScanFile_DedupAndTruncation(t *testing.T) {
108109
t.Fatalf("WriteFile: %v", err)
109110
}
110111

111-
ag := NewAPIIntegrationsIngestAgent(st, dir, slog.Default())
112+
ag := NewAPIIntegrationsIngestAgent(st, dir, 0, slog.Default())
112113
if err := ag.scanFile(path); err != nil {
113114
t.Fatalf("scanFile(1): %v", err)
114115
}
@@ -157,7 +158,7 @@ func TestAPIIntegrationsIngestAgent_Run_ProcessesMultipleFiles(t *testing.T) {
157158
}
158159
}
159160

160-
ag := NewAPIIntegrationsIngestAgent(st, dir, slog.Default())
161+
ag := NewAPIIntegrationsIngestAgent(st, dir, 0, slog.Default())
161162
ag.SetInterval(10 * time.Millisecond)
162163
ctx, cancel := context.WithCancel(context.Background())
163164
done := make(chan struct{})
@@ -177,3 +178,47 @@ func TestAPIIntegrationsIngestAgent_Run_ProcessesMultipleFiles(t *testing.T) {
177178
t.Fatalf("summary=%+v", summary)
178179
}
179180
}
181+
182+
func TestAPIIntegrationsIngestAgent_Scan_PrunesExpiredDatabaseRows(t *testing.T) {
183+
st, err := store.New(":memory:")
184+
if err != nil {
185+
t.Fatalf("New: %v", err)
186+
}
187+
defer st.Close()
188+
189+
oldEvent := `{"ts":"2025-12-01T12:00:00Z","integration":"notes","provider":"openai","model":"gpt-4.1-mini","prompt_tokens":2,"completion_tokens":1}`
190+
parsedOld, err := apiintegrations.ParseUsageEventLine([]byte(oldEvent), "/tmp/api-integrations/notes.jsonl")
191+
if err != nil {
192+
t.Fatalf("ParseUsageEventLine(old): %v", err)
193+
}
194+
if _, err := st.InsertAPIIntegrationUsageEvent(parsedOld); err != nil {
195+
t.Fatalf("InsertAPIIntegrationUsageEvent(old): %v", err)
196+
}
197+
198+
dir := t.TempDir()
199+
path := filepath.Join(dir, "notes.jsonl")
200+
newLine := `{"ts":"2026-04-03T12:00:00Z","integration":"notes","provider":"openai","model":"gpt-4.1-mini","prompt_tokens":3,"completion_tokens":2}` + "\n"
201+
if err := os.WriteFile(path, []byte(newLine), 0o600); err != nil {
202+
t.Fatalf("WriteFile: %v", err)
203+
}
204+
205+
ag := NewAPIIntegrationsIngestAgent(st, dir, 24*time.Hour, slog.Default())
206+
ag.pruneInterval = time.Millisecond
207+
if err := ag.pruneExpiredUsageEvents(); err != nil {
208+
t.Fatalf("pruneExpiredUsageEvents: %v", err)
209+
}
210+
if err := ag.scanFile(path); err != nil {
211+
t.Fatalf("scanFile: %v", err)
212+
}
213+
214+
events, err := st.QueryAPIIntegrationUsageRange(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC), time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC))
215+
if err != nil {
216+
t.Fatalf("QueryAPIIntegrationUsageRange: %v", err)
217+
}
218+
if len(events) != 1 {
219+
t.Fatalf("events=%+v", events)
220+
}
221+
if events[0].Timestamp.Format(time.RFC3339) != "2026-04-03T12:00:00Z" {
222+
t.Fatalf("expected retained new event, got %+v", events[0])
223+
}
224+
}

internal/config/config.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ type Config struct {
6161
GeminiAccessToken string // GEMINI_ACCESS_TOKEN (for Docker/headless)
6262

6363
// Custom API Integrations telemetry ingestion
64-
APIIntegrationsEnabled bool // ONWATCH_API_INTEGRATIONS_ENABLED (default: true)
65-
APIIntegrationsDir string // ONWATCH_API_INTEGRATIONS_DIR (default: ~/.onwatch/api-integrations or /data/api-integrations)
64+
APIIntegrationsEnabled bool // ONWATCH_API_INTEGRATIONS_ENABLED (default: true)
65+
APIIntegrationsDir string // ONWATCH_API_INTEGRATIONS_DIR (default: ~/.onwatch/api-integrations or /data/api-integrations)
66+
APIIntegrationsRetention time.Duration // ONWATCH_API_INTEGRATIONS_RETENTION (example: 720h, 0 disables pruning)
6667

6768
// Shared configuration
6869
PollInterval time.Duration // ONWATCH_POLL_INTERVAL (seconds → Duration)
@@ -303,9 +304,17 @@ func loadFromEnvAndFlags(flags *flagValues) (*Config, error) {
303304
// Custom API Integrations telemetry ingestion
304305
cfg.APIIntegrationsDir = strings.TrimSpace(os.Getenv("ONWATCH_API_INTEGRATIONS_DIR"))
305306
cfg.APIIntegrationsEnabled = true
307+
cfg.APIIntegrationsRetention = 60 * 24 * time.Hour
306308
if env := strings.ToLower(strings.TrimSpace(os.Getenv("ONWATCH_API_INTEGRATIONS_ENABLED"))); env != "" {
307309
cfg.APIIntegrationsEnabled = env == "true" || env == "1" || env == "yes" || env == "on"
308310
}
311+
if env := strings.TrimSpace(os.Getenv("ONWATCH_API_INTEGRATIONS_RETENTION")); env != "" {
312+
if env == "0" {
313+
cfg.APIIntegrationsRetention = 0
314+
} else if v, err := time.ParseDuration(env); err == nil {
315+
cfg.APIIntegrationsRetention = v
316+
}
317+
}
309318

310319
// Poll Interval (seconds) - ONWATCH_* first, SYNTRACK_* fallback
311320
if flags.interval > 0 {
@@ -457,6 +466,9 @@ func (c *Config) Validate() error {
457466
if c.Port < 1024 || c.Port > 65535 {
458467
return fmt.Errorf("port must be between 1024 and 65535")
459468
}
469+
if c.APIIntegrationsRetention < 0 {
470+
return fmt.Errorf("API integrations retention must be non-negative")
471+
}
460472

461473
return nil
462474
}
@@ -590,6 +602,7 @@ func (c *Config) String() string {
590602
fmt.Fprintf(&sb, " MiniMaxAPIKey: %s,\n", minimaxDisplay)
591603
fmt.Fprintf(&sb, " APIIntegrationsEnabled: %v,\n", c.APIIntegrationsEnabled)
592604
fmt.Fprintf(&sb, " APIIntegrationsDir: %s,\n", c.APIIntegrationsDir)
605+
fmt.Fprintf(&sb, " APIIntegrationsRetention: %v,\n", c.APIIntegrationsRetention)
593606

594607
fmt.Fprintf(&sb, " PollInterval: %v,\n", c.PollInterval)
595608
fmt.Fprintf(&sb, " SessionIdleTimeout: %v,\n", c.SessionIdleTimeout)

internal/config/config_test.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,37 @@ func TestConfig_DefaultValues(t *testing.T) {
173173
if cfg.LogLevel != "info" {
174174
t.Errorf("LogLevel = %q, want %q", cfg.LogLevel, "info")
175175
}
176+
if cfg.APIIntegrationsRetention != 60*24*time.Hour {
177+
t.Errorf("APIIntegrationsRetention = %v, want %v", cfg.APIIntegrationsRetention, 60*24*time.Hour)
178+
}
179+
}
180+
181+
func TestConfig_APIIntegrationsRetention_LoadsFromEnv(t *testing.T) {
182+
os.Clearenv()
183+
os.Setenv("ONWATCH_API_INTEGRATIONS_RETENTION", "168h")
184+
defer os.Clearenv()
185+
186+
cfg, err := Load()
187+
if err != nil {
188+
t.Fatalf("Load() failed: %v", err)
189+
}
190+
if cfg.APIIntegrationsRetention != 168*time.Hour {
191+
t.Errorf("APIIntegrationsRetention = %v, want %v", cfg.APIIntegrationsRetention, 168*time.Hour)
192+
}
193+
}
194+
195+
func TestConfig_APIIntegrationsRetention_Disabled(t *testing.T) {
196+
os.Clearenv()
197+
os.Setenv("ONWATCH_API_INTEGRATIONS_RETENTION", "0")
198+
defer os.Clearenv()
199+
200+
cfg, err := Load()
201+
if err != nil {
202+
t.Fatalf("Load() failed: %v", err)
203+
}
204+
if cfg.APIIntegrationsRetention != 0 {
205+
t.Errorf("APIIntegrationsRetention = %v, want 0", cfg.APIIntegrationsRetention)
206+
}
176207
}
177208

178209
func TestConfig_OnlySyntheticProvider(t *testing.T) {
@@ -1348,9 +1379,9 @@ func TestLoadEnvFile_IgnoresNonOnwatchLocalEnv(t *testing.T) {
13481379

13491380
func TestConfig_CodexShowAvailable(t *testing.T) {
13501381
tests := []struct {
1351-
name string
1352-
envVal string
1353-
want string
1382+
name string
1383+
envVal string
1384+
want string
13541385
}{
13551386
{"empty defaults to usage", "", "usage"},
13561387
{"usage passes through", "usage", "usage"},

0 commit comments

Comments
 (0)