diff --git a/.gitignore b/.gitignore
index fc097f2f..cf76729e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -70,6 +70,14 @@ logs/
coverage/
.nyc_output/
+# Test snapshots & generated test/lint artifacts
+**/__snapshots__/
+*.snap
+junit.xml
+jest-results.json
+*_output.txt
+*_output_*.txt
+
# Load test reports (generated) — keep the dir so k6 can write into it
load-tests/reports/*
!load-tests/reports/.gitkeep
diff --git a/app/screens/AnalyticsDashboard.tsx b/app/screens/AnalyticsDashboard.tsx
index d8929172..68833ba0 100644
--- a/app/screens/AnalyticsDashboard.tsx
+++ b/app/screens/AnalyticsDashboard.tsx
@@ -7,15 +7,20 @@ import {
ScrollView,
Share,
Alert,
+ TouchableOpacity,
} from 'react-native';
import { useSubscriptionStore } from '../../src/store/subscriptionStore';
import { useAnalyticsStore } from '../stores/analyticsStore';
import { useSettingsStore } from '../../src/store/settingsStore';
import { Card } from '../../src/components/common/Card';
import { Button } from '../../src/components/common/Button';
+import { CohortChart } from '../../src/components/analytics/CohortChart';
+import { RetentionHeatmap } from '../../src/components/analytics/RetentionHeatmap';
+import { SankeyDiagram } from '../../src/components/analytics/SankeyDiagram';
import { useThemeColors } from '../../src/hooks/useThemeColors';
import { spacing, typography } from '../../src/utils/constants';
import { formatCurrency } from '../../src/utils/formatting';
+import type { CohortGranularity } from '../../src/types/cohortAnalytics';
const AnalyticsDashboard: React.FC = () => {
const colors = useThemeColors();
@@ -23,12 +28,31 @@ const AnalyticsDashboard: React.FC = () => {
const { subscriptions } = useSubscriptionStore();
const { preferredCurrency } = useSettingsStore();
- const { report, compute, exportCSV } = useAnalyticsStore();
+ const {
+ report,
+ granularity,
+ cohortBuckets,
+ retentionCurve,
+ churnBreakdown,
+ planMigrationFlows,
+ ltvBySource,
+ revenueTrendWithAnomalies,
+ setGranularity,
+ compute,
+ exportCSV,
+ exportCohortCsv,
+ exportCohortPdf,
+ } = useAnalyticsStore();
useEffect(() => {
compute(subscriptions);
}, [subscriptions, compute]);
+ const handleSetGranularity = (next: CohortGranularity) => {
+ setGranularity(next);
+ compute(subscriptions);
+ };
+
const handleExportCSV = async () => {
try {
const csv = exportCSV(subscriptions);
@@ -38,6 +62,22 @@ const AnalyticsDashboard: React.FC = () => {
}
};
+ const handleExportCohortCsv = async () => {
+ try {
+ await Share.share({ message: exportCohortCsv(), title: 'Cohort Report (CSV)' });
+ } catch {
+ Alert.alert('Export Failed', 'Could not export cohort report');
+ }
+ };
+
+ const handleExportCohortPdf = async () => {
+ try {
+ await Share.share({ message: exportCohortPdf(), title: 'Cohort Report (PDF)' });
+ } catch {
+ Alert.alert('Export Failed', 'Could not export cohort report');
+ }
+ };
+
const currency = preferredCurrency ?? 'USD';
if (!report) {
@@ -107,36 +147,104 @@ const AnalyticsDashboard: React.FC = () => {
Revenue Trend (last 6 months)
- {report.revenueTrend.length === 0 ? (
+ {revenueTrendWithAnomalies.length === 0 ? (
No trend data yet
) : (
- report.revenueTrend.map((point, index) => (
-
- {point.label}
- {formatCurrency(point.mrr, currency)}
+ revenueTrendWithAnomalies.map((point, index, arr) => (
+
+
+ {point.label}
+ {point.isAnomaly ? ' ⚠️' : ''}
+
+
+ {formatCurrency(point.value, currency)}
+
))
)}
+ {revenueTrendWithAnomalies.some((point) => point.isAnomaly) && (
+ ⚠️ flagged points are statistical outliers vs. the rest of the trend
+ )}
- Cohorts
- {report.cohorts.length === 0 ? (
- No cohort data yet
+
+ Cohort Retention
+
+ {(['week', 'month'] as CohortGranularity[]).map((option) => (
+ handleSetGranularity(option)}
+ accessibilityRole="button"
+ accessibilityState={{ selected: granularity === option }}>
+
+ {option === 'week' ? 'Week' : 'Month'}
+
+
+ ))}
+
+
+
+ {cohortBuckets.slice(-4).map((bucket, index, arr) => (
+
+ {bucket.cohortKey}
+
+ {bucket.size} signups · {(bucket.retentionRate * 100).toFixed(0)}% retained ·{' '}
+ {formatCurrency(bucket.currentMrr, currency)}
+
+
+ ))}
+
+
+
+ Retention Curve (Day 1 / 7 / 30 / 60 / 90)
+
+
+
+
+ Revenue vs. Logo Churn (last 30 days)
+ {!churnBreakdown || churnBreakdown.isEmpty ? (
+ No subscribers active at the start of this period yet.
) : (
- report.cohorts.slice(-4).map((cohort, index, arr) => (
-
- {cohort.cohort}
+ <>
+
+ Logo churn (subscribers)
+
+ {(churnBreakdown.logoChurnRate * 100).toFixed(1)}% ({churnBreakdown.churnedSubscribers}/
+ {churnBreakdown.startingSubscribers})
+
+
+
+ Revenue churn (MRR)
- {(cohort.retentionRate * 100).toFixed(0)}% retained ·{' '}
- {formatCurrency(cohort.revenue, currency)}
+ {(churnBreakdown.revenueChurnRate * 100).toFixed(1)}% (
+ {formatCurrency(churnBreakdown.churnedMrr, currency)})
+
+
+ >
+ )}
+
+
+
+ Plan Migration
+
+
+
+
+ LTV by Acquisition Source
+ {ltvBySource.length === 0 ? (
+ No acquisition source data yet
+ ) : (
+ ltvBySource.map((row, index, arr) => (
+
+ {row.acquisitionChannel}
+
+ {formatCurrency(row.ltv, currency)} LTV · {row.subscriberCount} subs
))
@@ -159,6 +267,8 @@ const AnalyticsDashboard: React.FC = () => {
+
+
@@ -194,8 +304,27 @@ function createStyles(colors: ReturnType) {
statLabel: { ...typography.body, color: colors.textSecondary },
statValue: { ...typography.body, color: colors.text.primary, fontWeight: '600' },
emptyText: { ...typography.body, color: colors.textSecondary, textAlign: 'center' },
- exportContainer: { padding: spacing.lg, paddingTop: 0, marginBottom: spacing.xl },
+ exportContainer: { padding: spacing.lg, paddingTop: 0, marginBottom: spacing.xl, gap: spacing.sm },
loadingText: { ...typography.body, color: colors.textSecondary, padding: spacing.lg },
+ rowBetween: {
+ flexDirection: 'row',
+ justifyContent: 'space-between',
+ alignItems: 'center',
+ marginBottom: spacing.md,
+ },
+ granularityToggle: { flexDirection: 'row', gap: spacing.xs },
+ granularityButton: {
+ paddingVertical: spacing.xs,
+ paddingHorizontal: spacing.sm,
+ borderRadius: 6,
+ borderWidth: 1,
+ borderColor: colors.border.default,
+ },
+ granularityButtonActive: { backgroundColor: colors.primary, borderColor: colors.primary },
+ granularityButtonText: { ...typography.caption, color: colors.textSecondary },
+ granularityButtonTextActive: { color: colors.text.inverse, fontWeight: '600' },
+ anomalyValue: { color: colors.status.warning },
+ anomalyNote: { ...typography.caption, color: colors.status.warning, marginTop: spacing.xs },
});
}
diff --git a/app/stores/analyticsStore.ts b/app/stores/analyticsStore.ts
index 0e0e5483..adc21987 100644
--- a/app/stores/analyticsStore.ts
+++ b/app/stores/analyticsStore.ts
@@ -3,24 +3,106 @@ import {
calculateSubscriptionAnalytics,
SubscriptionAnalyticsReport,
} from '../../src/services/analyticsService';
-import { Subscription } from '../../src/types/subscription';
+import { BillingCycle, Subscription } from '../../src/types/subscription';
import { generateCSV } from '../../src/utils/importExport';
+import { CohortService } from '../../backend/services/analytics/cohortService';
+import { cohortTableToCsv } from '../../backend/services/analytics/cohortReportExport';
+import { cohortTableToPdfText } from '../../src/services/cohortPdfExport';
+import type {
+ ChurnBreakdown,
+ CohortBucket,
+ CohortGranularity,
+ LtvSourceBreakdown,
+ PlanMigrationFlow,
+ RetentionCurvePoint,
+ SubscriberRecord,
+ AnomalyFlaggedPoint,
+} from '../../src/types/cohortAnalytics';
+
+const DAY_MS = 24 * 60 * 60 * 1_000;
+
+/**
+ * Adapts the app's personal Subscription model into merchant-style
+ * SubscriberRecords so CohortService (built for the merchant analytics
+ * platform) can compute cohort/retention/churn/LTV metrics on it. Each
+ * tracked subscription stands in for a "subscriber" of this account.
+ */
+const toSubscriberRecords = (subscriptions: Subscription[]): SubscriberRecord[] =>
+ subscriptions.map((subscription) => ({
+ subscriberId: subscription.id,
+ merchantId: 'self',
+ planId: subscription.category,
+ planName: subscription.name,
+ region: subscription.timezone,
+ acquisitionChannel: subscription.isCryptoEnabled ? 'crypto' : 'card',
+ signupAt: new Date(subscription.createdAt).getTime(),
+ churnedAt: subscription.isActive ? undefined : new Date(subscription.updatedAt).getTime(),
+ lastActiveAt: new Date(subscription.updatedAt).getTime(),
+ mrr:
+ subscription.billingCycle === BillingCycle.YEARLY
+ ? subscription.price / 12
+ : subscription.billingCycle === BillingCycle.WEEKLY
+ ? subscription.price * 4.345
+ : subscription.price,
+ }));
interface AnalyticsStoreState {
report: SubscriptionAnalyticsReport | null;
+ granularity: CohortGranularity;
+ cohortBuckets: CohortBucket[];
+ retentionCurve: RetentionCurvePoint[];
+ churnBreakdown: ChurnBreakdown | null;
+ planMigrationFlows: PlanMigrationFlow[];
+ ltvBySource: LtvSourceBreakdown[];
+ revenueTrendWithAnomalies: AnomalyFlaggedPoint[];
+ setGranularity: (granularity: CohortGranularity) => void;
compute: (subscriptions: Subscription[]) => void;
exportCSV: (subscriptions: Subscription[]) => string;
+ exportCohortCsv: () => string;
+ exportCohortPdf: () => string;
}
-export const useAnalyticsStore = create()((set) => ({
+export const useAnalyticsStore = create()((set, get) => ({
report: null,
+ granularity: 'month',
+ cohortBuckets: [],
+ retentionCurve: [],
+ churnBreakdown: null,
+ planMigrationFlows: [],
+ ltvBySource: [],
+ revenueTrendWithAnomalies: [],
+
+ setGranularity: (granularity) => {
+ set({ granularity });
+ // Recompute is cheap (in-memory, no I/O) — callers re-run `compute` with
+ // the latest subscriptions list whenever granularity changes.
+ },
compute: (subscriptions) => {
const report = calculateSubscriptionAnalytics(subscriptions);
- set({ report });
+ const records = toSubscriberRecords(subscriptions);
+ const granularity = get().granularity;
+ const now = Date.now();
+ const periodStart = now - 30 * DAY_MS;
+
+ set({
+ report,
+ cohortBuckets: CohortService.buildCohortTable(records, granularity),
+ retentionCurve: CohortService.retentionCurve(records),
+ churnBreakdown: CohortService.revenueChurnVsLogoChurn(records, periodStart, now),
+ planMigrationFlows: CohortService.planMigrationFlows(records, periodStart, now),
+ ltvBySource: CohortService.ltvByAcquisitionSource(records),
+ revenueTrendWithAnomalies: CohortService.filterAnomalousSpikes(
+ report.revenueTrend.map((point) => ({ label: point.label, value: point.mrr }))
+ ),
+ });
},
exportCSV: (subscriptions) => {
return generateCSV(subscriptions);
},
+
+ exportCohortCsv: () => cohortTableToCsv(get().cohortBuckets),
+
+ exportCohortPdf: () => cohortTableToPdfText(get().cohortBuckets, 'Cohort Retention Report'),
}));
diff --git a/backend/analytics/jobs/cohortAggregationJob.ts b/backend/analytics/jobs/cohortAggregationJob.ts
new file mode 100644
index 00000000..af7aaefc
--- /dev/null
+++ b/backend/analytics/jobs/cohortAggregationJob.ts
@@ -0,0 +1,95 @@
+/**
+ * Cohort Aggregation Job
+ *
+ * Nightly job that pre-computes cohort tables (week + month granularity) for
+ * every merchant in the subscriber record repository, so the analytics
+ * dashboard and REST endpoints can serve cached results instead of
+ * recomputing on every request. Mirrors the start/stop + metrics shape used
+ * by backend/analytics/jobs/mvRefreshJob.ts.
+ */
+
+import { CohortService } from '../../services/analytics/cohortService';
+import { SubscriberRecordRepository, subscriberRecordRepository } from '../../services/analytics/subscriberRecordRepository';
+import type { CohortBucket } from '../../../src/types/cohortAnalytics';
+
+const DEFAULT_INTERVAL_MS = 24 * 60 * 60 * 1_000; // nightly
+
+export interface CohortAggregationMetrics {
+ runs: number;
+ merchantsProcessed: number;
+ lastRunAt: number | null;
+ lastRunDurationMs: number | null;
+ lastError: string | null;
+}
+
+export class CohortAggregationJob {
+ private readonly repository: SubscriberRecordRepository;
+ private readonly intervalMs: number;
+ private timer: ReturnType | null = null;
+ private isRunning = false;
+ private cache = new Map();
+ private metrics: CohortAggregationMetrics = {
+ runs: 0,
+ merchantsProcessed: 0,
+ lastRunAt: null,
+ lastRunDurationMs: null,
+ lastError: null,
+ };
+
+ constructor(repository: SubscriberRecordRepository = subscriberRecordRepository, intervalMs = DEFAULT_INTERVAL_MS) {
+ this.repository = repository;
+ this.intervalMs = intervalMs;
+ }
+
+ start(): void {
+ if (this.timer) return;
+ void this.run();
+ this.timer = setInterval(() => void this.run(), this.intervalMs);
+ if (this.timer.unref) this.timer.unref();
+ }
+
+ stop(): void {
+ if (this.timer) {
+ clearInterval(this.timer);
+ this.timer = null;
+ }
+ }
+
+ async run(): Promise {
+ if (this.isRunning) return;
+ this.isRunning = true;
+ const startedAt = Date.now();
+ try {
+ const merchants = this.repository.listMerchants();
+ for (const merchantId of merchants) {
+ const records = this.repository.getByMerchant(merchantId);
+ this.cache.set(merchantId, {
+ week: CohortService.buildCohortTable(records, 'week'),
+ month: CohortService.buildCohortTable(records, 'month'),
+ computedAt: Date.now(),
+ });
+ }
+ this.metrics.runs += 1;
+ this.metrics.merchantsProcessed = merchants.length;
+ this.metrics.lastRunAt = Date.now();
+ this.metrics.lastRunDurationMs = Date.now() - startedAt;
+ this.metrics.lastError = null;
+ } catch (error) {
+ this.metrics.lastError = error instanceof Error ? error.message : 'Cohort aggregation run failed';
+ } finally {
+ this.isRunning = false;
+ }
+ }
+
+ getCachedCohorts(merchantId: string, granularity: 'week' | 'month'): CohortBucket[] | null {
+ const cached = this.cache.get(merchantId);
+ if (!cached) return null;
+ return cached[granularity];
+ }
+
+ getMetrics(): CohortAggregationMetrics {
+ return { ...this.metrics };
+ }
+}
+
+export const cohortAggregationJob = new CohortAggregationJob();
diff --git a/backend/services/analytics/__tests__/cohortChurnRiskService.test.ts b/backend/services/analytics/__tests__/cohortChurnRiskService.test.ts
new file mode 100644
index 00000000..289dbaf1
--- /dev/null
+++ b/backend/services/analytics/__tests__/cohortChurnRiskService.test.ts
@@ -0,0 +1,32 @@
+import { getChurnRiskForCohort } from '../cohortChurnRiskService';
+import type { SubscriberRecord } from '../../../../src/types/cohortAnalytics';
+
+const makeRecord = (overrides: Partial = {}): SubscriberRecord => ({
+ subscriberId: 'sub_1',
+ merchantId: 'merchant_1',
+ planId: 'plan_basic',
+ planName: 'Basic',
+ signupAt: 1_700_000_000_000,
+ mrr: 20,
+ ...overrides,
+});
+
+describe('getChurnRiskForCohort', () => {
+ it('returns a neutral zero-confidence summary when there are no active subscribers', async () => {
+ const summary = await getChurnRiskForCohort('2026-01', []);
+ expect(summary).toEqual({
+ cohortKey: '2026-01',
+ sampledSubscribers: 0,
+ highRiskCount: 0,
+ mediumRiskCount: 0,
+ lowRiskCount: 0,
+ averageChurnProbability: 0,
+ });
+ });
+
+ it('degrades gracefully instead of throwing when the ml-service is unreachable', async () => {
+ const summary = await getChurnRiskForCohort('2026-01', [makeRecord()]);
+ expect(summary.cohortKey).toBe('2026-01');
+ expect(Number.isFinite(summary.averageChurnProbability)).toBe(true);
+ });
+});
diff --git a/backend/services/analytics/__tests__/cohortService.test.ts b/backend/services/analytics/__tests__/cohortService.test.ts
new file mode 100644
index 00000000..0ac5be74
--- /dev/null
+++ b/backend/services/analytics/__tests__/cohortService.test.ts
@@ -0,0 +1,159 @@
+import { CohortService } from '../cohortService';
+import type { SubscriberRecord } from '../../../../src/types/cohortAnalytics';
+
+const DAY_MS = 24 * 60 * 60 * 1_000;
+// 2026-01-15T00:00:00Z and 2026-02-10T00:00:00Z — fixed timestamps so cohort
+// keys (ISO week / month) are deterministic across test runs.
+const JAN_SIGNUP = Date.UTC(2026, 0, 15);
+const FEB_SIGNUP = Date.UTC(2026, 1, 10);
+
+const makeRecord = (overrides: Partial = {}): SubscriberRecord => ({
+ subscriberId: 'sub_1',
+ merchantId: 'merchant_1',
+ planId: 'plan_basic',
+ planName: 'Basic',
+ signupAt: JAN_SIGNUP,
+ mrr: 20,
+ ...overrides,
+});
+
+describe('CohortService', () => {
+ describe('buildCohortTable', () => {
+ it('returns an empty array for zero-data periods instead of throwing', () => {
+ expect(CohortService.buildCohortTable([], 'month')).toEqual([]);
+ });
+
+ it('groups subscribers by signup month and reports size/retention', () => {
+ const asOf = Date.UTC(2026, 2, 1);
+ const records = [
+ makeRecord({ subscriberId: 'a', signupAt: JAN_SIGNUP }),
+ makeRecord({ subscriberId: 'b', signupAt: JAN_SIGNUP, churnedAt: Date.UTC(2026, 1, 1) }),
+ makeRecord({ subscriberId: 'c', signupAt: FEB_SIGNUP }),
+ ];
+
+ const buckets = CohortService.buildCohortTable(records, 'month', asOf);
+ const jan = buckets.find((b) => b.cohortKey === '2026-01')!;
+ const feb = buckets.find((b) => b.cohortKey === '2026-02')!;
+
+ expect(jan.size).toBe(2);
+ expect(jan.activeCount).toBe(1);
+ expect(jan.retentionRate).toBe(0.5);
+ expect(jan.isEmpty).toBe(false);
+ expect(feb.size).toBe(1);
+ expect(feb.activeCount).toBe(1);
+ });
+
+ it('buckets by ISO week when granularity is week', () => {
+ const records = [makeRecord({ signupAt: JAN_SIGNUP })];
+ const buckets = CohortService.buildCohortTable(records, 'week', JAN_SIGNUP + DAY_MS);
+ expect(buckets).toHaveLength(1);
+ expect(buckets[0].cohortKey).toMatch(/^2026-W\d{2}$/);
+ });
+ });
+
+ describe('revenueChurnVsLogoChurn', () => {
+ it('reports isEmpty when no one was active at the start of the period', () => {
+ const breakdown = CohortService.revenueChurnVsLogoChurn([], JAN_SIGNUP, FEB_SIGNUP);
+ expect(breakdown.isEmpty).toBe(true);
+ expect(breakdown.logoChurnRate).toBe(0);
+ expect(breakdown.revenueChurnRate).toBe(0);
+ });
+
+ it('diverges revenue churn from logo churn when a high-value account churns', () => {
+ const periodStart = JAN_SIGNUP;
+ const periodEnd = FEB_SIGNUP;
+ const records = [
+ makeRecord({ subscriberId: 'whale', signupAt: JAN_SIGNUP - DAY_MS, mrr: 1000, churnedAt: JAN_SIGNUP + 5 * DAY_MS }),
+ makeRecord({ subscriberId: 'small1', signupAt: JAN_SIGNUP - DAY_MS, mrr: 10 }),
+ makeRecord({ subscriberId: 'small2', signupAt: JAN_SIGNUP - DAY_MS, mrr: 10 }),
+ makeRecord({ subscriberId: 'small3', signupAt: JAN_SIGNUP - DAY_MS, mrr: 10 }),
+ ];
+
+ const breakdown = CohortService.revenueChurnVsLogoChurn(records, periodStart, periodEnd);
+ expect(breakdown.startingSubscribers).toBe(4);
+ expect(breakdown.churnedSubscribers).toBe(1);
+ expect(breakdown.logoChurnRate).toBe(0.25);
+ // Revenue churn should be far higher than logo churn — the whale is most of the MRR.
+ expect(breakdown.revenueChurnRate).toBeGreaterThan(breakdown.logoChurnRate);
+ expect(breakdown.revenueChurnRate).toBeCloseTo(1000 / 1030, 4);
+ });
+ });
+
+ describe('planMigrationFlows', () => {
+ it('classifies upgrade/downgrade direction when plan prices are supplied', () => {
+ const records = [
+ makeRecord({
+ subscriberId: 'a',
+ planHistory: [{ fromPlanId: 'basic', toPlanId: 'pro', changedAt: JAN_SIGNUP + DAY_MS }],
+ }),
+ makeRecord({
+ subscriberId: 'b',
+ planHistory: [{ fromPlanId: 'pro', toPlanId: 'basic', changedAt: JAN_SIGNUP + 2 * DAY_MS }],
+ }),
+ ];
+
+ const flows = CohortService.planMigrationFlows(records, JAN_SIGNUP, JAN_SIGNUP + 10 * DAY_MS, {
+ basic: 10,
+ pro: 30,
+ });
+
+ const upgrade = flows.find((f) => f.fromPlanId === 'basic' && f.toPlanId === 'pro');
+ const downgrade = flows.find((f) => f.fromPlanId === 'pro' && f.toPlanId === 'basic');
+ expect(upgrade?.direction).toBe('upgrade');
+ expect(downgrade?.direction).toBe('downgrade');
+ });
+
+ it('ignores plan changes outside the requested period', () => {
+ const records = [
+ makeRecord({
+ planHistory: [{ fromPlanId: 'basic', toPlanId: 'pro', changedAt: JAN_SIGNUP - 100 * DAY_MS }],
+ }),
+ ];
+ const flows = CohortService.planMigrationFlows(records, JAN_SIGNUP, JAN_SIGNUP + 10 * DAY_MS);
+ expect(flows).toHaveLength(0);
+ });
+ });
+
+ describe('ltvByAcquisitionSource', () => {
+ it('groups unattributed subscribers under "unknown"', () => {
+ const breakdown = CohortService.ltvByAcquisitionSource([makeRecord({ acquisitionChannel: undefined })]);
+ expect(breakdown[0].acquisitionChannel).toBe('unknown');
+ });
+
+ it('computes a higher LTV for lower-churn channels', () => {
+ const records = [
+ makeRecord({ subscriberId: 'p1', acquisitionChannel: 'paid_search', mrr: 50, churnedAt: JAN_SIGNUP + 30 * DAY_MS }),
+ makeRecord({ subscriberId: 'p2', acquisitionChannel: 'paid_search', mrr: 50, churnedAt: JAN_SIGNUP + 30 * DAY_MS }),
+ makeRecord({ subscriberId: 'r1', acquisitionChannel: 'referral', mrr: 50 }),
+ makeRecord({ subscriberId: 'r2', acquisitionChannel: 'referral', mrr: 50 }),
+ ];
+ const breakdown = CohortService.ltvByAcquisitionSource(records);
+ const paid = breakdown.find((b) => b.acquisitionChannel === 'paid_search')!;
+ const referral = breakdown.find((b) => b.acquisitionChannel === 'referral')!;
+ expect(referral.ltv).toBeGreaterThan(paid.ltv);
+ });
+ });
+
+ describe('filterAnomalousSpikes', () => {
+ it('does not flag anything when there are too few points to compute a meaningful IQR', () => {
+ const flagged = CohortService.filterAnomalousSpikes([
+ { label: 'a', value: 10 },
+ { label: 'b', value: 1000 },
+ ]);
+ expect(flagged.every((point) => !point.isAnomaly)).toBe(true);
+ });
+
+ it('flags a clear outlier in an otherwise stable series', () => {
+ const flagged = CohortService.filterAnomalousSpikes([
+ { label: 'mon', value: 10 },
+ { label: 'tue', value: 11 },
+ { label: 'wed', value: 9 },
+ { label: 'thu', value: 10 },
+ { label: 'fri', value: 500 },
+ ]);
+ expect(flagged.find((p) => p.label === 'fri')?.isAnomaly).toBe(true);
+ expect(flagged.filter((p) => p.label !== 'fri').every((p) => !p.isAnomaly)).toBe(true);
+ });
+ });
+
+});
diff --git a/backend/services/analytics/__tests__/retentionCalculator.test.ts b/backend/services/analytics/__tests__/retentionCalculator.test.ts
new file mode 100644
index 00000000..8660029e
--- /dev/null
+++ b/backend/services/analytics/__tests__/retentionCalculator.test.ts
@@ -0,0 +1,73 @@
+import { RetentionCalculator, RETENTION_CURVE_DAYS } from '../retentionCalculator';
+import type { SubscriberRecord } from '../../../../src/types/cohortAnalytics';
+
+const DAY_MS = 24 * 60 * 60 * 1_000;
+const SIGNUP = 1_700_000_000_000;
+
+const makeRecord = (overrides: Partial = {}): SubscriberRecord => ({
+ subscriberId: 'sub_1',
+ merchantId: 'merchant_1',
+ planId: 'plan_basic',
+ planName: 'Basic',
+ signupAt: SIGNUP,
+ mrr: 10,
+ ...overrides,
+});
+
+describe('RetentionCalculator', () => {
+ describe('isRetainedAtDay', () => {
+ it('returns false before the cohort has reached the checkpoint', () => {
+ const record = makeRecord();
+ expect(RetentionCalculator.isRetainedAtDay(record, 30, SIGNUP + 10 * DAY_MS)).toBe(false);
+ });
+
+ it('treats an active subscriber as retained at every checkpoint reached so far', () => {
+ const record = makeRecord();
+ expect(RetentionCalculator.isRetainedAtDay(record, 30, SIGNUP + 90 * DAY_MS)).toBe(true);
+ });
+
+ it('returns false once the subscriber has churned before the checkpoint', () => {
+ const record = makeRecord({ churnedAt: SIGNUP + 5 * DAY_MS });
+ expect(RetentionCalculator.isRetainedAtDay(record, 30, SIGNUP + 90 * DAY_MS)).toBe(false);
+ });
+
+ it('treats a subscriber who churned after the checkpoint as retained at that checkpoint', () => {
+ const record = makeRecord({ churnedAt: SIGNUP + 45 * DAY_MS });
+ expect(RetentionCalculator.isRetainedAtDay(record, 30, SIGNUP + 90 * DAY_MS)).toBe(true);
+ expect(RetentionCalculator.isRetainedAtDay(record, 60, SIGNUP + 90 * DAY_MS)).toBe(false);
+ });
+ });
+
+ describe('retentionCurve', () => {
+ it('reports the standard Day 1/7/30/60/90 checkpoints', () => {
+ const curve = RetentionCalculator.retentionCurve([makeRecord()], SIGNUP + 100 * DAY_MS);
+ expect(curve.map((point) => point.day)).toEqual(RETENTION_CURVE_DAYS);
+ });
+
+ it('excludes cohorts too young to have reached a checkpoint instead of reporting 0%', () => {
+ const youngCohort = [makeRecord({ signupAt: SIGNUP })];
+ // asOf is only 10 days after signup — Day 30/60/90 haven't happened yet.
+ const curve = RetentionCalculator.retentionCurve(youngCohort, SIGNUP + 10 * DAY_MS);
+ const day30 = curve.find((point) => point.day === 30);
+ expect(day30?.cohortSize).toBe(0);
+ expect(day30?.retentionRate).toBe(0);
+ });
+
+ it('computes retention rate across a mixed cohort', () => {
+ const asOf = SIGNUP + 100 * DAY_MS;
+ const records = [
+ makeRecord({ subscriberId: 'a' }), // still active
+ makeRecord({ subscriberId: 'b', churnedAt: SIGNUP + 3 * DAY_MS }), // churned before Day 7
+ makeRecord({ subscriberId: 'c', churnedAt: SIGNUP + 45 * DAY_MS }), // churned between Day 30 and 60
+ ];
+ const curve = RetentionCalculator.retentionCurve(records, asOf);
+ const day1 = curve.find((point) => point.day === 1)!;
+ const day30 = curve.find((point) => point.day === 30)!;
+ const day60 = curve.find((point) => point.day === 60)!;
+
+ expect(day1.retainedCount).toBe(3); // all three were still subscribed past Day 1
+ expect(day30.retainedCount).toBe(2); // a, c — b churned on day 3, before Day 30
+ expect(day60.retainedCount).toBe(1); // only a — c churned on day 45, before Day 60
+ });
+ });
+});
diff --git a/backend/services/analytics/analyticsDashboardApi.ts b/backend/services/analytics/analyticsDashboardApi.ts
new file mode 100644
index 00000000..232181e8
--- /dev/null
+++ b/backend/services/analytics/analyticsDashboardApi.ts
@@ -0,0 +1,120 @@
+/**
+ * Analytics Dashboard REST API
+ *
+ * Request/response handlers for the cohort retention analytics suite
+ * (issue #545), following the ApiResponse convention used elsewhere in
+ * the backend (see sandbox/api/sandboxApi.ts and
+ * backend/services/notification/webhookManagementApi.ts).
+ */
+
+import { CohortService } from './cohortService';
+import { getChurnRiskForCohort } from './cohortChurnRiskService';
+import { cohortTableToCsv, cohortTableToPdf, ltvBreakdownToCsv } from './cohortReportExport';
+import { SubscriberRecordRepository, subscriberRecordRepository } from './subscriberRecordRepository';
+import { cohortAggregationJob, CohortAggregationJob } from '../../analytics/jobs/cohortAggregationJob';
+import type {
+ AnalyticsExportFormat,
+ ChurnBreakdown,
+ CohortBucket,
+ CohortGranularity,
+ ChurnRiskSummary,
+ LtvSourceBreakdown,
+ PlanMigrationFlow,
+ RetentionCurvePoint,
+} from '../../../src/types/cohortAnalytics';
+
+export interface ApiResponse {
+ success: boolean;
+ data?: T;
+ message?: string;
+ error?: string;
+}
+
+const ok = (data: T, message?: string): ApiResponse => ({ success: true, data, message });
+const fail = (error: unknown, fallback: string): ApiResponse => ({
+ success: false,
+ error: error instanceof Error ? error.message : fallback,
+});
+
+export class AnalyticsDashboardApi {
+ constructor(
+ private readonly repository: SubscriberRecordRepository = subscriberRecordRepository,
+ private readonly aggregationJob: CohortAggregationJob = cohortAggregationJob
+ ) {}
+
+ /** Serves the pre-aggregated nightly cohort table when available, falling back to a live computation. */
+ getCohortTable(merchantId: string, granularity: CohortGranularity): ApiResponse {
+ const cached = this.aggregationJob.getCachedCohorts(merchantId, granularity);
+ if (cached) return ok(cached, 'Served from nightly cohort_aggregation cache');
+ const records = this.repository.getByMerchant(merchantId);
+ return ok(CohortService.buildCohortTable(records, granularity), 'Computed live (no cached aggregation yet)');
+ }
+
+ getRetentionCurve(merchantId: string): ApiResponse {
+ const records = this.repository.getByMerchant(merchantId);
+ return ok(CohortService.retentionCurve(records));
+ }
+
+ getChurnBreakdown(merchantId: string, periodStart: number, periodEnd: number): ApiResponse {
+ const records = this.repository.getByMerchant(merchantId);
+ return ok(CohortService.revenueChurnVsLogoChurn(records, periodStart, periodEnd));
+ }
+
+ getPlanMigrationFlows(
+ merchantId: string,
+ periodStart: number,
+ periodEnd: number,
+ planPriceById?: Record
+ ): ApiResponse {
+ const records = this.repository.getByMerchant(merchantId);
+ return ok(CohortService.planMigrationFlows(records, periodStart, periodEnd, planPriceById));
+ }
+
+ getLtvByAcquisitionSource(merchantId: string): ApiResponse {
+ const records = this.repository.getByMerchant(merchantId);
+ return ok(CohortService.ltvByAcquisitionSource(records));
+ }
+
+ async getChurnRisk(merchantId: string, cohortKey: string): Promise> {
+ try {
+ const records = this.repository.getByMerchant(merchantId);
+ return ok(await getChurnRiskForCohort(cohortKey, records));
+ } catch (error) {
+ return fail(error, 'Failed to compute churn risk');
+ }
+ }
+
+ exportCohortReport(
+ merchantId: string,
+ granularity: CohortGranularity,
+ format: AnalyticsExportFormat
+ ): ApiResponse<{ filename: string; contentType: string; body: string | Buffer }> {
+ const records = this.repository.getByMerchant(merchantId);
+ const buckets = CohortService.buildCohortTable(records, granularity);
+
+ if (format === 'csv') {
+ return ok({
+ filename: `cohort-report-${merchantId}-${granularity}.csv`,
+ contentType: 'text/csv',
+ body: cohortTableToCsv(buckets),
+ });
+ }
+
+ return ok({
+ filename: `cohort-report-${merchantId}-${granularity}.pdf`,
+ contentType: 'application/pdf',
+ body: cohortTableToPdf(buckets),
+ });
+ }
+
+ exportLtvReport(merchantId: string): ApiResponse<{ filename: string; contentType: string; body: string }> {
+ const records = this.repository.getByMerchant(merchantId);
+ return ok({
+ filename: `ltv-by-source-${merchantId}.csv`,
+ contentType: 'text/csv',
+ body: ltvBreakdownToCsv(CohortService.ltvByAcquisitionSource(records)),
+ });
+ }
+}
+
+export const analyticsDashboardApi = new AnalyticsDashboardApi();
diff --git a/backend/services/analytics/cohortChurnRiskService.ts b/backend/services/analytics/cohortChurnRiskService.ts
new file mode 100644
index 00000000..2e01d087
--- /dev/null
+++ b/backend/services/analytics/cohortChurnRiskService.ts
@@ -0,0 +1,75 @@
+/**
+ * Predictive churn model integration point for the cohort analytics suite.
+ *
+ * Kept separate from cohortService.ts on purpose: this file pulls in
+ * PredictionService (which talks to the Python ml-service over HTTP, and
+ * imports Node's `path` module). CohortService itself must stay free of
+ * Node-only imports so it can run in both the backend and the mobile app
+ * bundle — see app/stores/analyticsStore.ts.
+ */
+
+import type { ChurnRiskSummary, SubscriberRecord } from '../../../src/types/cohortAnalytics';
+import { PredictionService } from './predictionService';
+
+const DAY_MS = 24 * 60 * 60 * 1_000;
+
+const isActiveAt = (record: SubscriberRecord, at: number): boolean =>
+ record.signupAt <= at && (record.churnedAt === undefined || record.churnedAt > at);
+
+/**
+ * Delegates to PredictionService (backend/services/analytics/predictionService.ts
+ * -> ml-service/churnModel.py). Network/model failures degrade to a
+ * zero-confidence summary rather than throwing, so a dashboard render never
+ * breaks because the ML service is down.
+ */
+export async function getChurnRiskForCohort(
+ cohortKey: string,
+ records: SubscriberRecord[],
+ asOf: number = Date.now(),
+ sampleSize = 25
+): Promise {
+ const sample = records.filter((record) => isActiveAt(record, asOf)).slice(0, sampleSize);
+ if (sample.length === 0) {
+ return {
+ cohortKey,
+ sampledSubscribers: 0,
+ highRiskCount: 0,
+ mediumRiskCount: 0,
+ lowRiskCount: 0,
+ averageChurnProbability: 0,
+ };
+ }
+
+ try {
+ const predictions = await PredictionService.predictChurnBatch(
+ sample.map((record) => ({
+ subscriberAddress: record.subscriberId,
+ userData: {
+ recentPaymentFailures: 0,
+ baselineLoginsPerMonth: 20,
+ recentLogins: record.lastActiveAt && asOf - record.lastActiveAt < 7 * DAY_MS ? 18 : 4,
+ openSupportTickets: 0,
+ priceSensitivityIndex: 0.5,
+ },
+ }))
+ );
+
+ const highRiskCount = predictions.filter((p) => p.riskLevel === 'High').length;
+ const mediumRiskCount = predictions.filter((p) => p.riskLevel === 'Medium').length;
+ const lowRiskCount = predictions.filter((p) => p.riskLevel === 'Low').length;
+ const averageChurnProbability =
+ predictions.reduce((sum, p) => sum + p.churnProbability, 0) / predictions.length;
+
+ return {
+ cohortKey,
+ sampledSubscribers: predictions.length,
+ highRiskCount,
+ mediumRiskCount,
+ lowRiskCount,
+ averageChurnProbability,
+ };
+ } catch {
+ // ml-service unreachable/unavailable — surface a neutral, non-throwing result.
+ return { cohortKey, sampledSubscribers: 0, highRiskCount: 0, mediumRiskCount: 0, lowRiskCount: 0, averageChurnProbability: 0 };
+ }
+}
diff --git a/backend/services/analytics/cohortReportExport.ts b/backend/services/analytics/cohortReportExport.ts
new file mode 100644
index 00000000..6d8f5852
--- /dev/null
+++ b/backend/services/analytics/cohortReportExport.ts
@@ -0,0 +1,119 @@
+/**
+ * Cohort report export — CSV and PDF, with zero added dependencies.
+ *
+ * The PDF writer emits a minimal-but-valid single-page PDF (catalog, pages,
+ * page, Helvetica font, one content stream, xref + trailer) by hand. There is
+ * no native PDF rendering dependency in this project, so this avoids pulling
+ * one in just for a tabular export.
+ */
+
+import type { CohortBucket, ChurnBreakdown, LtvSourceBreakdown } from '../../../src/types/cohortAnalytics';
+
+const escapeCsvCell = (value: string | number | boolean): string => {
+ const str = String(value);
+ return /[",\n]/.test(str) ? `"${str.replace(/"/g, '""')}"` : str;
+};
+
+export function cohortTableToCsv(buckets: CohortBucket[]): string {
+ const header = ['cohort', 'granularity', 'size', 'activeCount', 'retentionRate', 'startingMrr', 'currentMrr', 'isEmpty'];
+ const rows = buckets.map((bucket) => [
+ bucket.cohortKey,
+ bucket.granularity,
+ bucket.size,
+ bucket.activeCount,
+ bucket.retentionRate.toFixed(4),
+ bucket.startingMrr.toFixed(2),
+ bucket.currentMrr.toFixed(2),
+ bucket.isEmpty,
+ ]);
+ return [header, ...rows].map((row) => row.map(escapeCsvCell).join(',')).join('\n');
+}
+
+export function ltvBreakdownToCsv(breakdown: LtvSourceBreakdown[]): string {
+ const header = ['acquisitionChannel', 'subscriberCount', 'avgLifetimeMonths', 'avgMonthlyRevenue', 'ltv'];
+ const rows = breakdown.map((row) => [
+ row.acquisitionChannel,
+ row.subscriberCount,
+ row.avgLifetimeMonths.toFixed(2),
+ row.avgMonthlyRevenue.toFixed(2),
+ row.ltv.toFixed(2),
+ ]);
+ return [header, ...rows].map((row) => row.map(escapeCsvCell).join(',')).join('\n');
+}
+
+function escapePdfText(text: string): string {
+ return text.replace(/\\/g, '\\\\').replace(/\(/g, '\\(').replace(/\)/g, '\\)');
+}
+
+/** Builds a minimal valid single-page PDF from plain text lines (no dependencies). */
+export function buildSimplePdf(lines: string[]): Buffer {
+ const fontSize = 11;
+ const leading = 14;
+ const marginTop = 760;
+ const escaped = lines.map(escapePdfText);
+
+ const streamBody = [
+ 'BT',
+ `/F1 ${fontSize} Tf`,
+ `${leading} TL`,
+ `50 ${marginTop} Td`,
+ ...escaped.flatMap((line, index) => (index === 0 ? [`(${line}) Tj`] : ['T*', `(${line}) Tj`])),
+ 'ET',
+ ].join('\n');
+
+ const objects = [
+ '<< /Type /Catalog /Pages 2 0 R >>',
+ '<< /Type /Pages /Kids [3 0 R] /Count 1 >>',
+ '<< /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Resources << /Font << /F1 4 0 R >> >> /Contents 5 0 R >>',
+ '<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>',
+ `<< /Length ${Buffer.byteLength(streamBody, 'latin1')} >>\nstream\n${streamBody}\nendstream`,
+ ];
+
+ let pdf = '%PDF-1.4\n';
+ const offsets: number[] = [];
+ objects.forEach((obj, index) => {
+ offsets.push(Buffer.byteLength(pdf, 'latin1'));
+ pdf += `${index + 1} 0 obj\n${obj}\nendobj\n`;
+ });
+
+ const xrefOffset = Buffer.byteLength(pdf, 'latin1');
+ pdf += `xref\n0 ${objects.length + 1}\n0000000000 65535 f \n`;
+ for (const offset of offsets) {
+ pdf += `${offset.toString().padStart(10, '0')} 00000 n \n`;
+ }
+ pdf += `trailer\n<< /Size ${objects.length + 1} /Root 1 0 R >>\nstartxref\n${xrefOffset}\n%%EOF`;
+
+ return Buffer.from(pdf, 'latin1');
+}
+
+export function cohortTableToPdf(buckets: CohortBucket[], title = 'Cohort Retention Report'): Buffer {
+ const lines = [
+ title,
+ `Generated ${new Date().toISOString()}`,
+ '',
+ 'Cohort Size Active Retention Starting MRR Current MRR',
+ ...buckets.map(
+ (bucket) =>
+ `${bucket.cohortKey.padEnd(12)} ${String(bucket.size).padStart(4)} ${String(bucket.activeCount).padStart(6)} ${(bucket.retentionRate * 100).toFixed(1).padStart(8)}% ${bucket.startingMrr.toFixed(2).padStart(12)} ${bucket.currentMrr.toFixed(2).padStart(11)}`
+ ),
+ ];
+ if (buckets.length === 0) lines.push('(no cohort data for this period)');
+ return buildSimplePdf(lines);
+}
+
+export function churnBreakdownToPdf(breakdown: ChurnBreakdown, title = 'Revenue vs. Logo Churn'): Buffer {
+ const lines = [
+ title,
+ `Generated ${new Date().toISOString()}`,
+ '',
+ `Period: ${new Date(breakdown.periodStart).toISOString().slice(0, 10)} to ${new Date(breakdown.periodEnd).toISOString().slice(0, 10)}`,
+ `Starting subscribers: ${breakdown.startingSubscribers}`,
+ `Churned subscribers: ${breakdown.churnedSubscribers}`,
+ `Logo churn rate: ${(breakdown.logoChurnRate * 100).toFixed(2)}%`,
+ `Starting MRR: ${breakdown.startingMrr.toFixed(2)}`,
+ `Churned MRR: ${breakdown.churnedMrr.toFixed(2)}`,
+ `Revenue churn rate: ${(breakdown.revenueChurnRate * 100).toFixed(2)}%`,
+ ];
+ if (breakdown.isEmpty) lines.push('', '(no subscribers active at the start of this period)');
+ return buildSimplePdf(lines);
+}
diff --git a/backend/services/analytics/cohortService.ts b/backend/services/analytics/cohortService.ts
new file mode 100644
index 00000000..9f220978
--- /dev/null
+++ b/backend/services/analytics/cohortService.ts
@@ -0,0 +1,240 @@
+/**
+ * CohortService
+ *
+ * Builds the cohort tables, churn breakdowns, plan migration flows, and LTV
+ * breakdowns required by issue #545 (advanced subscription analytics).
+ * Operates on SubscriberRecord[] — see src/types/cohortAnalytics.ts.
+ *
+ * Deliberately free of Node-only imports (no 'path', 'crypto', Buffer) so it
+ * can run both in the backend and inside the mobile app bundle. The
+ * predictive-churn ML integration point lives separately in
+ * cohortChurnRiskService.ts, which does pull in Node-only deps and is
+ * backend-only — see that file for why.
+ */
+
+import type {
+ AnomalyFlaggedPoint,
+ ChurnBreakdown,
+ CohortBucket,
+ CohortGranularity,
+ LtvSourceBreakdown,
+ PlanMigrationFlow,
+ SubscriberRecord,
+} from '../../../src/types/cohortAnalytics';
+import { RetentionCalculator } from './retentionCalculator';
+
+const DAY_MS = 24 * 60 * 60 * 1_000;
+const WEEK_MS = 7 * DAY_MS;
+const FALLBACK_LIFETIME_MONTHS = 24; // used when a segment has no observed churn yet
+
+function startOfUtcDay(timestamp: number): number {
+ const d = new Date(timestamp);
+ return Date.UTC(d.getUTCFullYear(), d.getUTCMonth(), d.getUTCDate());
+}
+
+/** ISO-8601 week number (1-53), Monday-start, matching most analytics tooling. */
+function isoWeekKey(timestamp: number): string {
+ const date = new Date(startOfUtcDay(timestamp));
+ const dayNum = (date.getUTCDay() + 6) % 7; // 0 = Monday
+ date.setUTCDate(date.getUTCDate() - dayNum + 3); // nearest Thursday
+ const firstThursday = new Date(Date.UTC(date.getUTCFullYear(), 0, 4));
+ const week =
+ 1 + Math.round(((date.getTime() - firstThursday.getTime()) / DAY_MS - 3 + ((firstThursday.getUTCDay() + 6) % 7)) / 7);
+ return `${date.getUTCFullYear()}-W${String(week).padStart(2, '0')}`;
+}
+
+function monthKey(timestamp: number): string {
+ const d = new Date(timestamp);
+ return `${d.getUTCFullYear()}-${String(d.getUTCMonth() + 1).padStart(2, '0')}`;
+}
+
+function cohortKeyFor(timestamp: number, granularity: CohortGranularity): string {
+ return granularity === 'week' ? isoWeekKey(timestamp) : monthKey(timestamp);
+}
+
+function periodBoundsFor(timestamp: number, granularity: CohortGranularity): { start: number; end: number } {
+ if (granularity === 'month') {
+ const d = new Date(timestamp);
+ const start = Date.UTC(d.getUTCFullYear(), d.getUTCMonth(), 1);
+ const end = Date.UTC(d.getUTCFullYear(), d.getUTCMonth() + 1, 1);
+ return { start, end };
+ }
+ const start = startOfUtcDay(timestamp) - ((new Date(timestamp).getUTCDay() + 6) % 7) * DAY_MS;
+ return { start, end: start + WEEK_MS };
+}
+
+const isActiveAt = (record: SubscriberRecord, at: number): boolean =>
+ record.signupAt <= at && (record.churnedAt === undefined || record.churnedAt > at);
+
+export class CohortService {
+ /**
+ * Groups subscribers into signup cohorts and reports size + retention %.
+ * Buckets with no signups (new merchants, gaps in activity) come back with
+ * isEmpty: true rather than NaN/divide-by-zero metrics.
+ */
+ static buildCohortTable(
+ records: SubscriberRecord[],
+ granularity: CohortGranularity,
+ asOf: number = Date.now()
+ ): CohortBucket[] {
+ const buckets = new Map();
+ for (const record of records) {
+ const key = cohortKeyFor(record.signupAt, granularity);
+ const bucket = buckets.get(key) ?? [];
+ bucket.push(record);
+ buckets.set(key, bucket);
+ }
+
+ return Array.from(buckets.entries())
+ .sort(([a], [b]) => a.localeCompare(b))
+ .map(([cohortKey, cohortRecords]) => {
+ const { start, end } = periodBoundsFor(cohortRecords[0].signupAt, granularity);
+ const activeCount = cohortRecords.filter((record) => isActiveAt(record, asOf)).length;
+ const startingMrr = cohortRecords.reduce((sum, record) => sum + record.mrr, 0);
+ const currentMrr = cohortRecords
+ .filter((record) => isActiveAt(record, asOf))
+ .reduce((sum, record) => sum + record.mrr, 0);
+
+ return {
+ cohortKey,
+ granularity,
+ periodStart: start,
+ periodEnd: end,
+ size: cohortRecords.length,
+ activeCount,
+ retentionRate: cohortRecords.length ? activeCount / cohortRecords.length : 0,
+ startingMrr,
+ currentMrr,
+ isEmpty: cohortRecords.length === 0,
+ };
+ });
+ }
+
+ static retentionCurve(records: SubscriberRecord[], asOf: number = Date.now()) {
+ return RetentionCalculator.retentionCurve(records, asOf);
+ }
+
+ /** Logo churn (subscriber count) vs. revenue churn (MRR) for a period — these diverge when big accounts churn. */
+ static revenueChurnVsLogoChurn(
+ records: SubscriberRecord[],
+ periodStart: number,
+ periodEnd: number
+ ): ChurnBreakdown {
+ const startingCohort = records.filter((record) => isActiveAt(record, periodStart));
+ const churned = startingCohort.filter(
+ (record) => record.churnedAt !== undefined && record.churnedAt >= periodStart && record.churnedAt < periodEnd
+ );
+ const startingMrr = startingCohort.reduce((sum, record) => sum + record.mrr, 0);
+ const churnedMrr = churned.reduce((sum, record) => sum + record.mrr, 0);
+
+ return {
+ periodStart,
+ periodEnd,
+ startingSubscribers: startingCohort.length,
+ churnedSubscribers: churned.length,
+ logoChurnRate: startingCohort.length ? churned.length / startingCohort.length : 0,
+ startingMrr,
+ churnedMrr,
+ revenueChurnRate: startingMrr > 0 ? churnedMrr / startingMrr : 0,
+ isEmpty: startingCohort.length === 0,
+ };
+ }
+
+ /**
+ * Upgrade/downgrade/lateral plan-change flows in a period, for a Sankey diagram.
+ * `planPriceById` lets the caller classify direction by price; without it every
+ * flow is reported as `lateral` (we still surface the from→to counts).
+ */
+ static planMigrationFlows(
+ records: SubscriberRecord[],
+ periodStart: number,
+ periodEnd: number,
+ planPriceById?: Record
+ ): PlanMigrationFlow[] {
+ const counts = new Map();
+ for (const record of records) {
+ for (const change of record.planHistory ?? []) {
+ if (change.changedAt < periodStart || change.changedAt >= periodEnd) continue;
+ const key = `${change.fromPlanId}->${change.toPlanId}`;
+ counts.set(key, (counts.get(key) ?? 0) + 1);
+ }
+ }
+
+ return Array.from(counts.entries()).map(([key, count]) => {
+ const [fromPlanId, toPlanId] = key.split('->');
+ let direction: PlanMigrationFlow['direction'] = 'lateral';
+ if (planPriceById && planPriceById[fromPlanId] !== undefined && planPriceById[toPlanId] !== undefined) {
+ if (planPriceById[toPlanId] > planPriceById[fromPlanId]) direction = 'upgrade';
+ else if (planPriceById[toPlanId] < planPriceById[fromPlanId]) direction = 'downgrade';
+ }
+ return { fromPlanId, toPlanId, count, direction };
+ });
+ }
+
+ /** LTV broken down by acquisition channel, used for the LTV-by-source drill-down. */
+ static ltvByAcquisitionSource(records: SubscriberRecord[], asOf: number = Date.now()): LtvSourceBreakdown[] {
+ const groups = new Map();
+ for (const record of records) {
+ const channel = record.acquisitionChannel ?? 'unknown';
+ const group = groups.get(channel) ?? [];
+ group.push(record);
+ groups.set(channel, group);
+ }
+
+ return Array.from(groups.entries()).map(([acquisitionChannel, group]) => {
+ const churnedRecords = group.filter((record) => record.churnedAt !== undefined);
+ const avgMonthlyRevenue = group.length
+ ? group.reduce((sum, record) => sum + record.mrr, 0) / group.length
+ : 0;
+
+ const churnRate = group.length ? churnedRecords.length / group.length : 0;
+ const lifetimeFromChurnRate = churnRate > 0 ? 1 / churnRate : FALLBACK_LIFETIME_MONTHS;
+
+ const observedLifetimes = churnedRecords.map(
+ (record) => Math.max(1, ((record.churnedAt as number) - record.signupAt) / (30 * DAY_MS))
+ );
+ const avgObservedLifetime = observedLifetimes.length
+ ? observedLifetimes.reduce((sum, months) => sum + months, 0) / observedLifetimes.length
+ : undefined;
+
+ const avgLifetimeMonths = avgObservedLifetime ?? lifetimeFromChurnRate;
+
+ return {
+ acquisitionChannel,
+ subscriberCount: group.length,
+ avgLifetimeMonths,
+ avgMonthlyRevenue,
+ ltv: avgMonthlyRevenue * avgLifetimeMonths,
+ };
+ });
+ }
+
+ /**
+ * Flags statistical outliers (IQR method) in a labeled series so the UI can
+ * visually de-emphasize spikes instead of letting them distort chart scales.
+ * Needs at least 4 points to compute a meaningful IQR; otherwise nothing is flagged.
+ */
+ static filterAnomalousSpikes(series: { label: string; value: number }[]): AnomalyFlaggedPoint[] {
+ if (series.length < 4) {
+ return series.map((point) => ({ ...point, isAnomaly: false }));
+ }
+
+ const sorted = [...series.map((point) => point.value)].sort((a, b) => a - b);
+ const quantile = (q: number): number => {
+ const pos = (sorted.length - 1) * q;
+ const base = Math.floor(pos);
+ const rest = pos - base;
+ return sorted[base + 1] !== undefined ? sorted[base] + rest * (sorted[base + 1] - sorted[base]) : sorted[base];
+ };
+ const q1 = quantile(0.25);
+ const q3 = quantile(0.75);
+ const iqr = q3 - q1;
+ const lowerBound = q1 - 1.5 * iqr;
+ const upperBound = q3 + 1.5 * iqr;
+
+ return series.map((point) => ({
+ ...point,
+ isAnomaly: point.value < lowerBound || point.value > upperBound,
+ }));
+ }
+}
diff --git a/backend/services/analytics/index.ts b/backend/services/analytics/index.ts
index ccb27758..395a7407 100644
--- a/backend/services/analytics/index.ts
+++ b/backend/services/analytics/index.ts
@@ -12,3 +12,9 @@ export { RetentionService } from './retentionService';
export { OracleMonitorService, oracleMonitorService } from './oracleMonitorService';
export type { IPredictionService, IRecommendationService, IComplianceReportService, ICampaignService } from './interfaces';
export { AnalyticsError, AnalyticsErrorCode } from './errors';
+export { CohortService } from './cohortService';
+export { getChurnRiskForCohort } from './cohortChurnRiskService';
+export { RetentionCalculator, RETENTION_CURVE_DAYS } from './retentionCalculator';
+export { cohortTableToCsv, ltvBreakdownToCsv, cohortTableToPdf, churnBreakdownToPdf, buildSimplePdf } from './cohortReportExport';
+export { SubscriberRecordRepository, subscriberRecordRepository } from './subscriberRecordRepository';
+export { AnalyticsDashboardApi, analyticsDashboardApi } from './analyticsDashboardApi';
diff --git a/backend/services/analytics/retentionCalculator.ts b/backend/services/analytics/retentionCalculator.ts
new file mode 100644
index 00000000..f78c6311
--- /dev/null
+++ b/backend/services/analytics/retentionCalculator.ts
@@ -0,0 +1,48 @@
+/**
+ * RetentionCalculator
+ *
+ * Pure retention math used by CohortService: is-a-subscriber-still-active-at-day-N,
+ * and the Day 1/7/30/60/90 retention curve required by issue #545.
+ */
+
+import type { RetentionCurvePoint, SubscriberRecord } from '../../../src/types/cohortAnalytics';
+
+const DAY_MS = 24 * 60 * 60 * 1_000;
+export const RETENTION_CURVE_DAYS: RetentionCurvePoint['day'][] = [1, 7, 30, 60, 90];
+
+export class RetentionCalculator {
+ /**
+ * A subscriber is "retained at day N" if, N days after signup, they had not
+ * yet churned (or, when activity data is available, were still active on or
+ * after that day).
+ */
+ static isRetainedAtDay(record: SubscriberRecord, day: number, asOf: number = Date.now()): boolean {
+ const dayMark = record.signupAt + day * DAY_MS;
+ if (asOf < dayMark) return false; // cohort hasn't reached this day yet
+ if (record.churnedAt !== undefined && record.churnedAt < dayMark) return false;
+ if (record.lastActiveAt !== undefined) {
+ return record.lastActiveAt >= dayMark || record.churnedAt === undefined;
+ }
+ return record.churnedAt === undefined || record.churnedAt >= dayMark;
+ }
+
+ /** Retention curve across the standard Day 1/7/30/60/90 checkpoints for a set of records. */
+ static retentionCurve(
+ records: SubscriberRecord[],
+ asOf: number = Date.now(),
+ days: RetentionCurvePoint['day'][] = RETENTION_CURVE_DAYS
+ ): RetentionCurvePoint[] {
+ return days.map((day) => {
+ // Only cohorts old enough to have reached this checkpoint are eligible —
+ // otherwise a brand-new cohort would incorrectly show 0% Day 90 retention.
+ const eligible = records.filter((record) => asOf >= record.signupAt + day * DAY_MS);
+ const retained = eligible.filter((record) => this.isRetainedAtDay(record, day, asOf));
+ return {
+ day,
+ retainedCount: retained.length,
+ cohortSize: eligible.length,
+ retentionRate: eligible.length ? retained.length / eligible.length : 0,
+ };
+ });
+ }
+}
diff --git a/backend/services/analytics/subscriberRecordRepository.ts b/backend/services/analytics/subscriberRecordRepository.ts
new file mode 100644
index 00000000..fa393947
--- /dev/null
+++ b/backend/services/analytics/subscriberRecordRepository.ts
@@ -0,0 +1,38 @@
+/**
+ * In-memory repository for SubscriberRecord[], keyed by merchant.
+ * Mirrors backend/services/repositories/inMemory.ts's seed/clear test helpers —
+ * there is no live database wired up for this analytics module yet.
+ */
+
+import type { SubscriberRecord } from '../../../src/types/cohortAnalytics';
+
+export class SubscriberRecordRepository {
+ private byMerchant = new Map();
+
+ seed(merchantId: string, records: SubscriberRecord[]): void {
+ this.byMerchant.set(merchantId, [...records]);
+ }
+
+ upsert(record: SubscriberRecord): void {
+ const existing = this.byMerchant.get(record.merchantId) ?? [];
+ const index = existing.findIndex((r) => r.subscriberId === record.subscriberId);
+ if (index >= 0) existing[index] = record;
+ else existing.push(record);
+ this.byMerchant.set(record.merchantId, existing);
+ }
+
+ getByMerchant(merchantId: string): SubscriberRecord[] {
+ return [...(this.byMerchant.get(merchantId) ?? [])];
+ }
+
+ listMerchants(): string[] {
+ return Array.from(this.byMerchant.keys());
+ }
+
+ clear(merchantId?: string): void {
+ if (merchantId) this.byMerchant.delete(merchantId);
+ else this.byMerchant.clear();
+ }
+}
+
+export const subscriberRecordRepository = new SubscriberRecordRepository();
diff --git a/backend/services/notification/__tests__/webhook.test.ts b/backend/services/notification/__tests__/webhook.test.ts
index ee4222e6..15f51a47 100644
--- a/backend/services/notification/__tests__/webhook.test.ts
+++ b/backend/services/notification/__tests__/webhook.test.ts
@@ -3,13 +3,14 @@ import {
buildWebhookPayload,
signWebhookPayload,
verifyWebhookSignature,
-} from '../notification/webhook';
+ verifyWebhookSignatureAny,
+} from '../webhook';
import type {
WebhookEventInput,
WebhookPlanSnapshot,
WebhookSubscriptionSnapshot,
-} from '../../../src/types/webhook';
-import { BillingCycle } from '../../../src/types/subscription';
+} from '../../../../src/types/webhook';
+import { BillingCycle } from '../../../../src/types/subscription';
const makeSubscription = (
overrides: Partial = {}
@@ -93,8 +94,8 @@ describe('WebhookDeliveryService', () => {
expect(sleepImpl).toHaveBeenCalledWith(10);
});
- it('fails fast for payloads over 1MB', async () => {
- const fetchImpl = jest.fn();
+ it('truncates payloads over 1MB and ships a hash instead of failing', async () => {
+ const fetchImpl = jest.fn().mockResolvedValue({ ok: true, status: 200 });
const service = new WebhookDeliveryService({ fetchImpl: fetchImpl as typeof fetch });
const webhook = service.registerWebhook({
@@ -115,8 +116,143 @@ describe('WebhookDeliveryService', () => {
makeInput({ webhookId: webhook.id, subscription: giantSubscription })
);
+ expect(result?.delivery.status).toBe('delivered');
+ expect(result?.delivery.payloadTruncated).toBe(true);
+ expect(result?.delivery.payloadHash).toBeDefined();
+ expect(fetchImpl).toHaveBeenCalledTimes(1);
+ const [, init] = fetchImpl.mock.calls[0] as [string, RequestInit];
+ expect(Buffer.byteLength(init.body as string, 'utf8')).toBeLessThanOrEqual(1_048_576);
+ expect((init.headers as Record)['X-SubTrackr-Payload-Truncated']).toBe('true');
+ });
+
+ it('uses the default fixed retry schedule (1m, 5m, 15m, ...) when no custom policy is set', async () => {
+ const fetchImpl = jest
+ .fn()
+ .mockRejectedValueOnce(new Error('down'))
+ .mockResolvedValueOnce({ ok: true, status: 200 });
+ const sleepImpl = jest.fn().mockResolvedValue(undefined);
+ const service = new WebhookDeliveryService({ fetchImpl: fetchImpl as typeof fetch, sleepImpl });
+
+ const webhook = service.registerWebhook({
+ merchantId: 'merchant_1',
+ url: 'https://example.com/webhook',
+ events: ['subscription.charged'],
+ secretKey: 'secret',
+ });
+
+ await service.deliverEvent(makeInput({ webhookId: webhook.id }));
+
+ expect(sleepImpl).toHaveBeenCalledWith(60_000);
+ });
+
+ it('dedups deliveries with the same idempotency key within the 24h window', async () => {
+ const fetchImpl = jest.fn().mockResolvedValue({ ok: true, status: 200 });
+ const service = new WebhookDeliveryService({ fetchImpl: fetchImpl as typeof fetch });
+
+ const webhook = service.registerWebhook({
+ merchantId: 'merchant_1',
+ url: 'https://example.com/webhook',
+ events: ['subscription.charged'],
+ secretKey: 'secret',
+ });
+
+ const input = makeInput({ webhookId: webhook.id, idempotencyKey: 'fixed-key' });
+ const first = await service.deliverEvent(input);
+ expect(first?.delivery.status).toBe('delivered');
+
+ const second = await service.deliverEvent(input);
+ expect(second?.delivery.status).toBe('skipped');
+ expect(fetchImpl).toHaveBeenCalledTimes(1);
+ });
+
+ it('moves exhausted deliveries to the dead-letter queue and supports manual replay', async () => {
+ const fetchImpl = jest
+ .fn()
+ .mockRejectedValueOnce(new Error('down'))
+ .mockResolvedValueOnce({ ok: true, status: 200 });
+ const sleepImpl = jest.fn().mockResolvedValue(undefined);
+ const service = new WebhookDeliveryService({ fetchImpl: fetchImpl as typeof fetch, sleepImpl });
+
+ const webhook = service.registerWebhook({
+ merchantId: 'merchant_1',
+ url: 'https://example.com/webhook',
+ events: ['subscription.charged'],
+ secretKey: 'secret',
+ retryPolicy: { maxRetries: 0, initialDelayMs: 10, maxDelayMs: 10, backoffFactor: 2 },
+ });
+
+ const result = await service.deliverEvent(makeInput({ webhookId: webhook.id }));
expect(result?.delivery.status).toBe('failed');
- expect(fetchImpl).not.toHaveBeenCalled();
+ expect(result?.delivery.isDeadLettered).toBe(true);
+ expect(service.listDeadLetters(webhook.id)).toHaveLength(1);
+
+ const replayed = await service.replayDeadLetter(result!.delivery.id);
+ expect(replayed.delivery.status).toBe('delivered');
+ expect(service.listDeadLetters(webhook.id)).toHaveLength(0);
+ });
+
+ it('rotates signing secrets with an overlapping valid period', () => {
+ const service = new WebhookDeliveryService({ fetchImpl: jest.fn() as unknown as typeof fetch });
+
+ const webhook = service.registerWebhook({
+ merchantId: 'merchant_1',
+ url: 'https://example.com/webhook',
+ events: ['subscription.charged'],
+ secretKey: 'old-secret',
+ });
+
+ const payload = buildWebhookPayload(makeInput({ webhookId: webhook.id }));
+ const oldSignature = signWebhookPayload(payload, 'old-secret');
+
+ const rotated = service.rotateSecret(webhook.id, 'new-secret', 60_000);
+ const newSignature = signWebhookPayload(payload, 'new-secret');
+
+ expect(rotated.secretKey).toBe('new-secret');
+ expect(rotated.secrets).toHaveLength(2);
+ expect(rotated.secrets[0].validUntil).toBeDefined();
+ // Both the old (within overlap) and new secret should verify.
+ const { verifyWebhookSignatureAny } = jest.requireActual('../webhook');
+ expect(verifyWebhookSignatureAny(rotated, oldSignature, payload)).toBe(true);
+ expect(verifyWebhookSignatureAny(rotated, newSignature, payload)).toBe(true);
+ });
+
+ it('auto-disables a webhook when its endpoint returns 410 Gone', async () => {
+ const fetchImpl = jest.fn().mockResolvedValue({ ok: false, status: 410 });
+ const service = new WebhookDeliveryService({ fetchImpl: fetchImpl as typeof fetch });
+
+ const webhook = service.registerWebhook({
+ merchantId: 'merchant_1',
+ url: 'https://example.com/webhook',
+ events: ['subscription.charged'],
+ secretKey: 'secret',
+ });
+
+ const result = await service.deliverEvent(makeInput({ webhookId: webhook.id }));
+
+ expect(result?.delivery.status).toBe('failed');
+ expect(fetchImpl).toHaveBeenCalledTimes(1);
+ expect(service.getWebhook(webhook.id)?.isPaused).toBe(true);
+ expect(service.getWebhook(webhook.id)?.disabledReason).toBe('Endpoint returned 410 Gone');
+ });
+
+ it('enforces a configurable burst limit independent of the steady-state cap', async () => {
+ const fetchImpl = jest.fn().mockResolvedValue({ ok: true, status: 200 });
+ const service = new WebhookDeliveryService({ fetchImpl: fetchImpl as typeof fetch });
+
+ const webhook = service.registerWebhook({
+ merchantId: 'merchant_1',
+ url: 'https://example.com/webhook',
+ events: ['subscription.charged'],
+ secretKey: 'secret',
+ rateLimit: { burstLimit: 1, burstWindowMs: 1_000, steadyPerMinute: 100 },
+ });
+
+ const first = await service.deliverEvent(makeInput({ webhookId: webhook.id }));
+ const second = await service.deliverEvent(makeInput({ webhookId: webhook.id }));
+
+ expect(first?.delivery.status).toBe('delivered');
+ expect(second?.delivery.status).toBe('retrying');
+ expect(second?.delivery.errorMessage).toMatch(/rate limited/i);
});
it('supports manual retry after a failed delivery', async () => {
diff --git a/backend/services/notification/index.ts b/backend/services/notification/index.ts
index f75cc967..4b7bd5b2 100644
--- a/backend/services/notification/index.ts
+++ b/backend/services/notification/index.ts
@@ -2,8 +2,17 @@ export { NotificationPreferenceService } from './preferenceService';
export type { NotificationPreferences } from './preferenceService';
export { AlertingService } from './alerting';
export type { AlertDispatcher } from './alerting';
-export { WebhookDeliveryService, webhookDeliveryService } from './webhook';
+export {
+ WebhookDeliveryService,
+ webhookDeliveryService,
+ WEBHOOK_IDEMPOTENCY_HEADER,
+ verifyWebhookSignatureAny,
+} from './webhook';
export type { RegisterWebhookInput, WebhookDeliveryResult } from './webhook';
+export { WebhookManagementApi, webhookManagementApi } from './webhookManagementApi';
+export type { ApiResponse } from './webhookManagementApi';
+export { DeliveryWorker, deliveryWorker } from './jobs/deliveryWorker';
+export { DlqCleanupJob, dlqCleanupJob } from './jobs/dlqCleanupJob';
export { WebSocketServer, webSocketServer } from './websocket';
export type { SubscriptionEventType, SubscriptionEvent, EventFilter, ClientInfo } from './websocket';
export type { INotificationPreferenceService, IAlertingService, IWebhookDeliveryService, IWebsocketService } from './interfaces';
diff --git a/backend/services/notification/jobs/deliveryWorker.ts b/backend/services/notification/jobs/deliveryWorker.ts
new file mode 100644
index 00000000..c5347902
--- /dev/null
+++ b/backend/services/notification/jobs/deliveryWorker.ts
@@ -0,0 +1,74 @@
+/**
+ * Webhook Delivery Worker
+ *
+ * Polls the webhook delivery retry queue and re-attempts any delivery whose
+ * `nextRetryAt` has elapsed (1min, 5min, 15min, 1h, 6h schedule by default).
+ * Runs on a fixed interval; safe to call `tick()` concurrently — it dedupes
+ * against in-flight retries via the underlying delivery's `nextRetryAt`.
+ */
+
+import { WebhookDeliveryService, webhookDeliveryService } from '../webhook';
+
+const DEFAULT_POLL_INTERVAL_MS = 15_000;
+
+export interface DeliveryWorkerMetrics {
+ ticks: number;
+ retriesProcessed: number;
+ lastTickAt: number | null;
+ lastError: string | null;
+}
+
+export class DeliveryWorker {
+ private readonly service: WebhookDeliveryService;
+ private readonly pollIntervalMs: number;
+ private timer: ReturnType | null = null;
+ private isTicking = false;
+ private metrics: DeliveryWorkerMetrics = {
+ ticks: 0,
+ retriesProcessed: 0,
+ lastTickAt: null,
+ lastError: null,
+ };
+
+ constructor(service: WebhookDeliveryService = webhookDeliveryService, pollIntervalMs = DEFAULT_POLL_INTERVAL_MS) {
+ this.service = service;
+ this.pollIntervalMs = pollIntervalMs;
+ }
+
+ start(): void {
+ if (this.timer) return;
+ void this.tick();
+ this.timer = setInterval(() => void this.tick(), this.pollIntervalMs);
+ if (this.timer.unref) this.timer.unref();
+ }
+
+ stop(): void {
+ if (this.timer) {
+ clearInterval(this.timer);
+ this.timer = null;
+ }
+ }
+
+ /** Processes one batch of due retries. Skips overlapping ticks. */
+ async tick(): Promise {
+ if (this.isTicking) return;
+ this.isTicking = true;
+ try {
+ const results = await this.service.processDueRetries();
+ this.metrics.ticks += 1;
+ this.metrics.retriesProcessed += results.length;
+ this.metrics.lastTickAt = Date.now();
+ this.metrics.lastError = null;
+ } catch (error) {
+ this.metrics.lastError = error instanceof Error ? error.message : 'Delivery worker tick failed';
+ } finally {
+ this.isTicking = false;
+ }
+ }
+
+ getMetrics(): DeliveryWorkerMetrics {
+ return { ...this.metrics };
+ }
+}
+
+export const deliveryWorker = new DeliveryWorker();
diff --git a/backend/services/notification/jobs/dlqCleanupJob.ts b/backend/services/notification/jobs/dlqCleanupJob.ts
new file mode 100644
index 00000000..277880e5
--- /dev/null
+++ b/backend/services/notification/jobs/dlqCleanupJob.ts
@@ -0,0 +1,74 @@
+/**
+ * Webhook DLQ Cleanup Job
+ *
+ * Nightly housekeeping for the webhook delivery subsystem:
+ * - purges dead-lettered deliveries past their retention window (default 30 days)
+ * - purges expired idempotency keys past the 24h dedup window
+ *
+ * Mirrors the start/stop + metrics shape used by other backend cron jobs
+ * (see backend/analytics/jobs/mvRefreshJob.ts).
+ */
+
+import { WebhookDeliveryService, webhookDeliveryService } from '../webhook';
+
+const DEFAULT_INTERVAL_MS = 24 * 60 * 60 * 1_000; // nightly
+
+export interface DlqCleanupMetrics {
+ runs: number;
+ deadLettersPurged: number;
+ idempotencyKeysPurged: number;
+ lastRunAt: number | null;
+ lastError: string | null;
+}
+
+export class DlqCleanupJob {
+ private readonly service: WebhookDeliveryService;
+ private readonly intervalMs: number;
+ private timer: ReturnType | null = null;
+ private metrics: DlqCleanupMetrics = {
+ runs: 0,
+ deadLettersPurged: 0,
+ idempotencyKeysPurged: 0,
+ lastRunAt: null,
+ lastError: null,
+ };
+
+ constructor(service: WebhookDeliveryService = webhookDeliveryService, intervalMs = DEFAULT_INTERVAL_MS) {
+ this.service = service;
+ this.intervalMs = intervalMs;
+ }
+
+ start(): void {
+ if (this.timer) return;
+ void this.run();
+ this.timer = setInterval(() => void this.run(), this.intervalMs);
+ if (this.timer.unref) this.timer.unref();
+ }
+
+ stop(): void {
+ if (this.timer) {
+ clearInterval(this.timer);
+ this.timer = null;
+ }
+ }
+
+ async run(): Promise {
+ try {
+ const deadLettersPurged = this.service.cleanupDeadLetters();
+ const idempotencyKeysPurged = this.service.cleanupExpiredIdempotencyKeys();
+ this.metrics.runs += 1;
+ this.metrics.deadLettersPurged += deadLettersPurged;
+ this.metrics.idempotencyKeysPurged += idempotencyKeysPurged;
+ this.metrics.lastRunAt = Date.now();
+ this.metrics.lastError = null;
+ } catch (error) {
+ this.metrics.lastError = error instanceof Error ? error.message : 'DLQ cleanup run failed';
+ }
+ }
+
+ getMetrics(): DlqCleanupMetrics {
+ return { ...this.metrics };
+ }
+}
+
+export const dlqCleanupJob = new DlqCleanupJob();
diff --git a/backend/services/notification/webhook.ts b/backend/services/notification/webhook.ts
index c09525bc..f75b2370 100644
--- a/backend/services/notification/webhook.ts
+++ b/backend/services/notification/webhook.ts
@@ -7,7 +7,9 @@ import type {
WebhookEventInput,
WebhookEventPayload,
WebhookEventType,
+ WebhookRateLimitConfig,
WebhookRetryPolicy,
+ WebhookSecret,
} from '../../../src/types/webhook';
export type { WebhookEventInput } from '../../../src/types/webhook';
@@ -21,6 +23,7 @@ export interface RegisterWebhookInput {
secretKey: string;
retryPolicy?: Partial;
rateLimitPerMinute?: number;
+ rateLimit?: WebhookRateLimitConfig;
isPaused?: boolean;
}
@@ -29,12 +32,23 @@ export interface WebhookDeliveryResult {
response?: Response;
}
+export const WEBHOOK_IDEMPOTENCY_HEADER = 'Idempotency-Key';
+
const MAX_PAYLOAD_BYTES = 1_048_576;
+const BODY_PREVIEW_CHARS = 500;
+const IDEMPOTENCY_WINDOW_MS = 24 * 60 * 60 * 1_000; // 24 hours
+const DEFAULT_BURST_WINDOW_MS = 1_000;
+const DLQ_RETENTION_MS = 30 * 24 * 60 * 60 * 1_000; // 30 days
+
+// Default retry schedule: 1min, 5min, 15min, 1h, 6h (5 attempts after the first).
+const DEFAULT_RETRY_DELAYS_MS = [60_000, 300_000, 900_000, 3_600_000, 21_600_000];
+
const DEFAULT_RETRY_POLICY: WebhookRetryPolicy = {
maxRetries: 5,
- initialDelayMs: 250,
- maxDelayMs: 8_000,
+ initialDelayMs: DEFAULT_RETRY_DELAYS_MS[0],
+ maxDelayMs: DEFAULT_RETRY_DELAYS_MS[DEFAULT_RETRY_DELAYS_MS.length - 1],
backoffFactor: 2,
+ retryDelaysMs: DEFAULT_RETRY_DELAYS_MS,
};
const now = (): number => Date.now();
@@ -47,10 +61,16 @@ const clampRetryPolicy = (retryPolicy?: Partial): WebhookRet
initialDelayMs: retryPolicy?.initialDelayMs ?? DEFAULT_RETRY_POLICY.initialDelayMs,
maxDelayMs: retryPolicy?.maxDelayMs ?? DEFAULT_RETRY_POLICY.maxDelayMs,
backoffFactor: retryPolicy?.backoffFactor ?? DEFAULT_RETRY_POLICY.backoffFactor,
+ // Only fall back to the fixed default schedule when the caller didn't ask
+ // for a custom policy at all — an explicit custom policy keeps using the
+ // exponential formula unless it supplies its own retryDelaysMs.
+ retryDelaysMs: retryPolicy?.retryDelaysMs ?? (retryPolicy ? undefined : DEFAULT_RETRY_POLICY.retryDelaysMs),
});
const sleep = (ms: number): Promise => new Promise((resolve) => setTimeout(resolve, ms));
+const sha256Hex = (data: string): string => crypto.createHash('sha256').update(data).digest('hex');
+
export const signWebhookPayload = (payload: WebhookEventPayload, secretKey: string): string => {
const body = JSON.stringify(payload);
return crypto.createHmac('sha256', secretKey).update(body).digest('hex');
@@ -89,13 +109,30 @@ export const isWebhookEventAllowed = (
eventType: WebhookEventType
): boolean => !webhook.isPaused && webhook.events.includes(eventType);
+/** Returns true if `signature` matches any signing secret currently valid for `webhook`. */
+export const verifyWebhookSignatureAny = (
+ webhook: Pick,
+ signature: string,
+ payload: WebhookEventPayload,
+ at: number = Date.now()
+): boolean => {
+ const secrets = webhook.secrets?.length ? webhook.secrets : [{ key: webhook.secretKey, createdAt: at, validFrom: at }];
+ return secrets.some((secret) => {
+ if (at < secret.validFrom) return false;
+ if (secret.validUntil !== undefined && at > secret.validUntil) return false;
+ return verifyWebhookSignature(signature, payload, secret.key);
+ });
+};
+
export class WebhookDeliveryService {
private readonly fetchImpl: FetchLike;
private readonly sleepImpl: (ms: number) => Promise;
private readonly webhooks = new Map();
private readonly deliveries = new Map();
- private readonly deliveredKeys = new Set();
+ private readonly deliveredKeys = new Map();
private readonly rateLimitWindows = new Map();
+ private readonly burstWindows = new Map();
+ private readonly deadLetters = new Map();
constructor(options: { fetchImpl?: FetchLike; sleepImpl?: (ms: number) => Promise } = {}) {
this.fetchImpl = options.fetchImpl ?? fetch;
@@ -111,9 +148,12 @@ export class WebhookDeliveryService {
url: input.url,
events: [...input.events],
secretKey: input.secretKey,
+ secrets: [{ key: input.secretKey, createdAt, validFrom: createdAt }],
retryPolicy: clampRetryPolicy(input.retryPolicy),
rateLimitPerMinute: input.rateLimitPerMinute,
+ rateLimit: input.rateLimit,
isPaused: input.isPaused ?? false,
+ disabledReason: undefined,
createdAt,
updatedAt: createdAt,
lastHealthCheckAt: undefined,
@@ -140,6 +180,7 @@ export class WebhookDeliveryService {
secretKey: input.secretKey ?? existing.secretKey,
retryPolicy: clampRetryPolicy(input.retryPolicy ?? existing.retryPolicy),
rateLimitPerMinute: input.rateLimitPerMinute ?? existing.rateLimitPerMinute,
+ rateLimit: input.rateLimit ?? existing.rateLimit,
isPaused: input.isPaused ?? existing.isPaused,
updatedAt: now(),
};
@@ -157,7 +198,35 @@ export class WebhookDeliveryService {
}
resumeWebhook(id: string): WebhookConfig {
- return this.updateWebhook(id, { isPaused: false });
+ const existing = this.webhooks.get(id);
+ if (!existing) throw new Error(`Webhook ${id} not found`);
+ const next: WebhookConfig = { ...existing, isPaused: false, disabledReason: undefined, updatedAt: now() };
+ this.webhooks.set(id, next);
+ return next;
+ }
+
+ /**
+ * Rotates the active signing secret. The previous secret stays valid until
+ * `now + overlapMs` so in-flight receivers have time to pick up the new one.
+ */
+ rotateSecret(id: string, newSecret: string, overlapMs = 24 * 60 * 60 * 1_000): WebhookConfig {
+ const existing = this.webhooks.get(id);
+ if (!existing) throw new Error(`Webhook ${id} not found`);
+
+ const rotatedAt = now();
+ const secrets: WebhookSecret[] = (existing.secrets ?? []).map((secret) =>
+ secret.validUntil === undefined ? { ...secret, validUntil: rotatedAt + overlapMs } : secret
+ );
+ secrets.push({ key: newSecret, createdAt: rotatedAt, validFrom: rotatedAt });
+
+ const next: WebhookConfig = {
+ ...existing,
+ secretKey: newSecret,
+ secrets,
+ updatedAt: rotatedAt,
+ };
+ this.webhooks.set(id, next);
+ return next;
}
listWebhooks(merchantId: string): WebhookConfig[] {
@@ -180,6 +249,76 @@ export class WebhookDeliveryService {
return this.deliveries.get(deliveryId);
}
+ /** Deliveries that have exhausted retries and are awaiting manual replay. */
+ listDeadLetters(webhookId?: string): WebhookDelivery[] {
+ return Array.from(this.deadLetters.keys())
+ .map((deliveryId) => this.deliveries.get(deliveryId))
+ .filter((delivery): delivery is WebhookDelivery => !!delivery)
+ .filter((delivery) => !webhookId || delivery.webhookId === webhookId)
+ .sort((a, b) => (a.deadLetteredAt ?? 0) - (b.deadLetteredAt ?? 0));
+ }
+
+ /**
+ * Manually replay a dead-lettered delivery, resetting its attempt count.
+ * On success the entry is cleared from the DLQ; on renewed failure it is re-queued.
+ */
+ async replayDeadLetter(deliveryId: string): Promise {
+ if (!this.deadLetters.has(deliveryId)) {
+ throw new Error(`Delivery ${deliveryId} is not in the dead-letter queue`);
+ }
+ return this.retryWebhookDelivery(deliveryId);
+ }
+
+ /** Removes dead-lettered deliveries older than `maxAgeMs`. Returns the count purged. */
+ cleanupDeadLetters(maxAgeMs: number = DLQ_RETENTION_MS): number {
+ const cutoff = now() - maxAgeMs;
+ let purged = 0;
+ for (const [deliveryId, deadLetteredAt] of this.deadLetters) {
+ if (deadLetteredAt < cutoff) {
+ this.deadLetters.delete(deliveryId);
+ this.deliveries.delete(deliveryId);
+ purged++;
+ }
+ }
+ return purged;
+ }
+
+ /** Removes idempotency keys older than the 24h dedup window. Returns the count purged. */
+ cleanupExpiredIdempotencyKeys(windowMs: number = IDEMPOTENCY_WINDOW_MS): number {
+ const cutoff = now() - windowMs;
+ let purged = 0;
+ for (const [key, deliveredAt] of this.deliveredKeys) {
+ if (deliveredAt < cutoff) {
+ this.deliveredKeys.delete(key);
+ purged++;
+ }
+ }
+ return purged;
+ }
+
+ /** Deliveries currently due for an automatic retry (used by the delivery worker). */
+ getDueRetries(at: number = now()): WebhookDelivery[] {
+ return Array.from(this.deliveries.values()).filter(
+ (delivery) =>
+ delivery.status === 'retrying' &&
+ !this.deadLetters.has(delivery.id) &&
+ delivery.nextRetryAt !== undefined &&
+ delivery.nextRetryAt <= at
+ );
+ }
+
+ /** Processes every delivery currently due for retry. Used by the delivery worker cron. */
+ async processDueRetries(): Promise {
+ const due = this.getDueRetries();
+ const results: WebhookDeliveryResult[] = [];
+ for (const delivery of due) {
+ const webhook = this.webhooks.get(delivery.webhookId);
+ if (!webhook) continue;
+ results.push(await this.sendWithRetry(webhook, delivery));
+ }
+ return results;
+ }
+
getAnalytics(webhookId: string): WebhookAnalytics {
const deliveries = this.getWebhookDeliveries(webhookId, Number.MAX_SAFE_INTEGER);
const totalDeliveries = deliveries.length;
@@ -252,6 +391,17 @@ export class WebhookDeliveryService {
}
}
+ /** True if `idempotencyKey` was already delivered successfully within the 24h dedup window. */
+ private isWithinIdempotencyWindow(idempotencyKey: string): boolean {
+ const deliveredAt = this.deliveredKeys.get(idempotencyKey);
+ if (deliveredAt === undefined) return false;
+ if (now() - deliveredAt > IDEMPOTENCY_WINDOW_MS) {
+ this.deliveredKeys.delete(idempotencyKey);
+ return false;
+ }
+ return true;
+ }
+
async deliverEvent(input: WebhookEventInput): Promise {
const webhook = this.webhooks.get(input.webhookId);
if (!webhook || webhook.merchantId !== input.merchantId) return null;
@@ -259,8 +409,8 @@ export class WebhookDeliveryService {
const payload = buildWebhookPayload(input);
const signature = signWebhookPayload(payload, webhook.secretKey);
- const idempotencyKey = `${payload.id}:${webhook.id}`;
- if (this.deliveredKeys.has(idempotencyKey)) {
+ const idempotencyKey = input.idempotencyKey ?? `${payload.id}:${webhook.id}`;
+ if (this.isWithinIdempotencyWindow(idempotencyKey)) {
const delivery: WebhookDelivery = {
id: createId('del'),
webhookId: webhook.id,
@@ -323,7 +473,7 @@ export class WebhookDeliveryService {
this.deliveries.set(delivery.id, result.delivery);
if (result.delivery.status === 'delivered') {
- this.deliveredKeys.add(idempotencyKey);
+ this.deliveredKeys.set(idempotencyKey, now());
}
return result;
}
@@ -348,7 +498,7 @@ export class WebhookDeliveryService {
this.deliveries.set(deliveryId, result.delivery);
if (result.delivery.status === 'delivered') {
- this.deliveredKeys.add(existing.idempotencyKey);
+ this.deliveredKeys.set(existing.idempotencyKey, now());
}
return result;
}
@@ -357,21 +507,29 @@ export class WebhookDeliveryService {
webhook: WebhookConfig,
delivery: WebhookDelivery
): Promise {
- const payloadBody = JSON.stringify(delivery.payload);
- if (Buffer.byteLength(payloadBody, 'utf8') > MAX_PAYLOAD_BYTES) {
- return this.finalizeDelivery(webhook, delivery, {
- status: 'failed',
- errorMessage: 'Payload exceeds 1MB limit',
- });
+ const fullPayloadBody = JSON.stringify(delivery.payload);
+ const fullByteLength = Buffer.byteLength(fullPayloadBody, 'utf8');
+ let payloadBody = fullPayloadBody;
+ let payloadTruncated = false;
+ let payloadHash: string | undefined;
+ if (fullByteLength > MAX_PAYLOAD_BYTES) {
+ payloadHash = sha256Hex(fullPayloadBody);
+ payloadBody = Buffer.from(fullPayloadBody, 'utf8').subarray(0, MAX_PAYLOAD_BYTES).toString('utf8');
+ payloadTruncated = true;
}
+ const bodyPreview = payloadBody.slice(0, BODY_PREVIEW_CHARS);
- const headers = {
+ const headers: Record = {
'Content-Type': 'application/json',
'X-SubTrackr-Signature': delivery.signature,
'X-SubTrackr-Event-Type': delivery.eventType,
'X-SubTrackr-Event-Id': delivery.eventId,
- 'X-SubTrackr-Idempotency-Key': delivery.idempotencyKey,
+ [WEBHOOK_IDEMPOTENCY_HEADER]: delivery.idempotencyKey,
};
+ if (payloadTruncated) {
+ headers['X-SubTrackr-Payload-Truncated'] = 'true';
+ headers['X-SubTrackr-Payload-Hash'] = payloadHash as string;
+ }
let attempt = delivery.attempts;
let lastError: string | undefined;
@@ -386,6 +544,9 @@ export class WebhookDeliveryService {
attempts: attempt,
lastAttemptAt: attemptAt,
updatedAt: attemptAt,
+ bodyPreview,
+ payloadTruncated,
+ payloadHash,
};
this.deliveries.set(delivery.id, next);
@@ -396,6 +557,21 @@ export class WebhookDeliveryService {
body: payloadBody,
});
+ if (response.status === 410) {
+ // Endpoint is permanently gone — stop retrying and disable the webhook.
+ this.webhooks.set(webhook.id, {
+ ...webhook,
+ isPaused: true,
+ disabledReason: 'Endpoint returned 410 Gone',
+ updatedAt: now(),
+ });
+ return this.finalizeDelivery(webhook, next, {
+ status: 'failed',
+ responseCode: 410,
+ errorMessage: 'Endpoint returned 410 Gone (webhook auto-disabled)',
+ }, undefined, true);
+ }
+
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
@@ -421,7 +597,7 @@ export class WebhookDeliveryService {
status: 'failed',
errorMessage: lastError,
responseCode: undefined,
- });
+ }, undefined, true);
}
const retried: WebhookDelivery = {
@@ -438,20 +614,34 @@ export class WebhookDeliveryService {
return this.finalizeDelivery(webhook, delivery, {
status: 'failed',
errorMessage: lastError ?? 'Webhook delivery failed',
- });
+ }, undefined, true);
}
private finalizeDelivery(
webhook: WebhookConfig,
delivery: WebhookDelivery,
patch: Partial & { status: WebhookDeliveryStatus },
- response?: Response
+ response?: Response,
+ deadLetterOnFailure = false
): WebhookDeliveryResult {
+ const finalizedAt = now();
const next: WebhookDelivery = {
...delivery,
...patch,
- updatedAt: now(),
+ updatedAt: finalizedAt,
};
+
+ if (deadLetterOnFailure && next.status === 'failed') {
+ next.isDeadLettered = true;
+ next.deadLetteredAt = finalizedAt;
+ this.deadLetters.set(next.id, finalizedAt);
+ } else if (next.status === 'delivered' && this.deadLetters.has(next.id)) {
+ // A manual retry succeeded for a previously dead-lettered delivery.
+ this.deadLetters.delete(next.id);
+ next.isDeadLettered = false;
+ next.deadLetteredAt = undefined;
+ }
+
this.deliveries.set(delivery.id, next);
const configPatch: Partial = {
@@ -465,20 +655,49 @@ export class WebhookDeliveryService {
? 'degraded'
: webhook.lastHealthStatus,
};
- this.webhooks.set(webhook.id, { ...webhook, ...configPatch });
+ this.webhooks.set(webhook.id, { ...this.webhooks.get(webhook.id)!, ...configPatch });
return { delivery: next, response };
}
private computeDelay(policy: WebhookRetryPolicy, attempt: number): number {
+ if (policy.retryDelaysMs?.length) {
+ const schedule = policy.retryDelaysMs;
+ return schedule[Math.min(attempt - 1, schedule.length - 1)];
+ }
const factor = policy.backoffFactor ?? DEFAULT_RETRY_POLICY.backoffFactor ?? 2;
const rawDelay = Math.floor(policy.initialDelayMs * Math.pow(factor, Math.max(0, attempt - 1)));
return Math.min(rawDelay, policy.maxDelayMs);
}
private isRateLimited(webhook: WebhookConfig): boolean {
+ const nowMs = now();
+
+ if (webhook.rateLimit) {
+ const burstWindowMs = webhook.rateLimit.burstWindowMs ?? DEFAULT_BURST_WINDOW_MS;
+ const burstStart = nowMs - burstWindowMs;
+ const burstCurrent = (this.burstWindows.get(webhook.id) ?? []).filter((t) => t >= burstStart);
+ const steadyStart = nowMs - 60_000;
+ const steadyCurrent = (this.rateLimitWindows.get(webhook.id) ?? []).filter((t) => t >= steadyStart);
+
+ if (
+ burstCurrent.length >= webhook.rateLimit.burstLimit ||
+ steadyCurrent.length >= webhook.rateLimit.steadyPerMinute
+ ) {
+ this.burstWindows.set(webhook.id, burstCurrent);
+ this.rateLimitWindows.set(webhook.id, steadyCurrent);
+ return true;
+ }
+
+ burstCurrent.push(nowMs);
+ steadyCurrent.push(nowMs);
+ this.burstWindows.set(webhook.id, burstCurrent);
+ this.rateLimitWindows.set(webhook.id, steadyCurrent);
+ return false;
+ }
+
if (!webhook.rateLimitPerMinute || webhook.rateLimitPerMinute <= 0) return false;
- const windowStart = now() - 60_000;
+ const windowStart = nowMs - 60_000;
const current = (this.rateLimitWindows.get(webhook.id) ?? []).filter(
(timestamp) => timestamp >= windowStart
);
@@ -486,7 +705,7 @@ export class WebhookDeliveryService {
this.rateLimitWindows.set(webhook.id, current);
return true;
}
- current.push(now());
+ current.push(nowMs);
this.rateLimitWindows.set(webhook.id, current);
return false;
}
diff --git a/backend/services/notification/webhookManagementApi.ts b/backend/services/notification/webhookManagementApi.ts
new file mode 100644
index 00000000..54a8893c
--- /dev/null
+++ b/backend/services/notification/webhookManagementApi.ts
@@ -0,0 +1,144 @@
+/**
+ * Webhook Management REST API
+ *
+ * Thin request/response wrapper around WebhookDeliveryService, following the
+ * ApiResponse convention used elsewhere in the backend (see sandbox/api/sandboxApi.ts).
+ * There is no live HTTP server in this codebase yet — these methods are the
+ * handlers an Express/Fastify route would call directly.
+ */
+
+import {
+ WebhookDeliveryService,
+ webhookDeliveryService,
+ RegisterWebhookInput,
+} from './webhook';
+import type { WebhookConfig, WebhookDelivery, WebhookEventInput } from '../../../src/types/webhook';
+
+export interface ApiResponse {
+ success: boolean;
+ data?: T;
+ message?: string;
+ error?: string;
+}
+
+const ok = (data: T, message?: string): ApiResponse => ({ success: true, data, message });
+const fail = (error: unknown, fallback: string): ApiResponse => ({
+ success: false,
+ error: error instanceof Error ? error.message : fallback,
+});
+
+export class WebhookManagementApi {
+ constructor(private readonly service: WebhookDeliveryService = webhookDeliveryService) {}
+
+ registerWebhook(input: RegisterWebhookInput): ApiResponse {
+ try {
+ return ok(this.service.registerWebhook(input), 'Webhook registered');
+ } catch (error) {
+ return fail(error, 'Failed to register webhook');
+ }
+ }
+
+ updateWebhook(
+ id: string,
+ input: Partial>
+ ): ApiResponse {
+ try {
+ return ok(this.service.updateWebhook(id, input), 'Webhook updated');
+ } catch (error) {
+ return fail(error, 'Failed to update webhook');
+ }
+ }
+
+ deleteWebhook(id: string): ApiResponse {
+ try {
+ this.service.deleteWebhook(id);
+ return ok(null, 'Webhook deleted');
+ } catch (error) {
+ return fail(error, 'Failed to delete webhook');
+ }
+ }
+
+ pauseWebhook(id: string): ApiResponse {
+ try {
+ return ok(this.service.pauseWebhook(id), 'Webhook paused');
+ } catch (error) {
+ return fail(error, 'Failed to pause webhook');
+ }
+ }
+
+ resumeWebhook(id: string): ApiResponse {
+ try {
+ return ok(this.service.resumeWebhook(id), 'Webhook resumed');
+ } catch (error) {
+ return fail(error, 'Failed to resume webhook');
+ }
+ }
+
+ /** Rotates the signing secret. `overlapMs` controls how long the old secret stays valid. */
+ rotateSecret(id: string, newSecret: string, overlapMs?: number): ApiResponse {
+ try {
+ return ok(this.service.rotateSecret(id, newSecret, overlapMs), 'Secret rotated');
+ } catch (error) {
+ return fail(error, 'Failed to rotate webhook secret');
+ }
+ }
+
+ listWebhooks(merchantId: string): ApiResponse {
+ return ok(this.service.listWebhooks(merchantId));
+ }
+
+ getWebhook(id: string): ApiResponse {
+ const webhook = this.service.getWebhook(id);
+ if (!webhook) return fail(null, `Webhook ${id} not found`);
+ return ok(webhook);
+ }
+
+ /** Emits a lifecycle event. Pass the client's `Idempotency-Key` header via `input.idempotencyKey`. */
+ async emitEvent(input: WebhookEventInput): Promise> {
+ try {
+ const result = await this.service.deliverEvent(input);
+ if (!result) return fail(null, 'Webhook not found, not subscribed to this event, or merchant mismatch');
+ return ok(result.delivery, `Delivery ${result.delivery.status}`);
+ } catch (error) {
+ return fail(error, 'Failed to emit webhook event');
+ }
+ }
+
+ getDeliveryLogs(webhookId: string, limit = 50): ApiResponse {
+ return ok(this.service.getWebhookDeliveries(webhookId, limit));
+ }
+
+ getDelivery(deliveryId: string): ApiResponse {
+ const delivery = this.service.getDelivery(deliveryId);
+ if (!delivery) return fail(null, `Delivery ${deliveryId} not found`);
+ return ok(delivery);
+ }
+
+ async retryDelivery(deliveryId: string): Promise> {
+ try {
+ const result = await this.service.retryWebhookDelivery(deliveryId);
+ return ok(result.delivery, `Delivery ${result.delivery.status}`);
+ } catch (error) {
+ return fail(error, 'Failed to retry delivery');
+ }
+ }
+
+ listDeadLetters(webhookId?: string): ApiResponse {
+ return ok(this.service.listDeadLetters(webhookId));
+ }
+
+ async replayDeadLetter(deliveryId: string): Promise> {
+ try {
+ const result = await this.service.replayDeadLetter(deliveryId);
+ return ok(result.delivery, `Delivery ${result.delivery.status}`);
+ } catch (error) {
+ return fail(error, 'Failed to replay dead-lettered delivery');
+ }
+ }
+
+ getAnalytics(webhookId: string) {
+ return ok(this.service.getAnalytics(webhookId));
+ }
+}
+
+export const webhookManagementApi = new WebhookManagementApi();
diff --git a/src/components/analytics/CohortChart.tsx b/src/components/analytics/CohortChart.tsx
new file mode 100644
index 00000000..0fd80dd5
--- /dev/null
+++ b/src/components/analytics/CohortChart.tsx
@@ -0,0 +1,103 @@
+import React from 'react';
+import { View, Text, StyleSheet, Dimensions } from 'react-native';
+import Svg, { Rect, Text as SvgText, Line, G } from 'react-native-svg';
+import { spacing } from '../../utils/constants';
+import { useThemeColors } from '../../hooks/useThemeColors';
+import type { CohortBucket } from '../../types/cohortAnalytics';
+
+const { width: screenWidth } = Dimensions.get('window');
+const CHART_WIDTH = screenWidth - spacing.xl * 2 - spacing.lg * 2;
+const CHART_HEIGHT = 180;
+
+interface CohortChartProps {
+ buckets: CohortBucket[];
+}
+
+/** Bar chart of cohort size, with retention % labeled above each bar. */
+export const CohortChart: React.FC = ({ buckets }) => {
+ const colors = useThemeColors();
+
+ if (buckets.length === 0) {
+ return (
+
+
+ No cohorts yet — add subscriptions to start building cohort history.
+
+
+ );
+ }
+
+ const visible = buckets.slice(-8);
+ const maxSize = Math.max(...visible.map((bucket) => bucket.size), 1);
+ const barWidth = (CHART_WIDTH - 20) / visible.length - 8;
+
+ return (
+
+ );
+};
+
+const styles = StyleSheet.create({
+ emptyState: { paddingVertical: spacing.lg, alignItems: 'center' },
+ emptyText: { textAlign: 'center', fontSize: 13 },
+});
+
+export default CohortChart;
diff --git a/src/components/analytics/RetentionHeatmap.tsx b/src/components/analytics/RetentionHeatmap.tsx
new file mode 100644
index 00000000..5a0716c1
--- /dev/null
+++ b/src/components/analytics/RetentionHeatmap.tsx
@@ -0,0 +1,69 @@
+import React from 'react';
+import { View, Text, StyleSheet } from 'react-native';
+import { spacing, borderRadius } from '../../utils/constants';
+import { useThemeColors } from '../../hooks/useThemeColors';
+import type { RetentionCurvePoint } from '../../types/cohortAnalytics';
+
+interface RetentionHeatmapProps {
+ points: RetentionCurvePoint[];
+}
+
+const intensityColor = (rate: number, success: string, warning: string, error: string): string => {
+ if (rate >= 0.7) return success;
+ if (rate >= 0.4) return warning;
+ return error;
+};
+
+/** Day 1 / 7 / 30 / 60 / 90 retention curve rendered as a heatmap strip. */
+export const RetentionHeatmap: React.FC = ({ points }) => {
+ const colors = useThemeColors();
+
+ if (points.length === 0 || points.every((point) => point.cohortSize === 0)) {
+ return (
+
+ Not enough cohort history yet to plot a retention curve.
+
+ );
+ }
+
+ return (
+
+ {points.map((point) => {
+ const opacity = point.cohortSize === 0 ? 0.15 : 0.25 + point.retentionRate * 0.75;
+ const baseColor = intensityColor(
+ point.retentionRate,
+ colors.status.success,
+ colors.status.warning,
+ colors.status.error
+ );
+ return (
+
+
+
+ {point.cohortSize === 0 ? '—' : `${Math.round(point.retentionRate * 100)}%`}
+
+
+ Day {point.day}
+
+ );
+ })}
+
+ );
+};
+
+const styles = StyleSheet.create({
+ row: { flexDirection: 'row', justifyContent: 'space-between', gap: spacing.xs },
+ cellContainer: { alignItems: 'center', flex: 1 },
+ cell: {
+ width: '100%',
+ aspectRatio: 1,
+ borderRadius: borderRadius.md,
+ alignItems: 'center',
+ justifyContent: 'center',
+ },
+ cellValue: { color: '#fff', fontWeight: '700', fontSize: 12 },
+ cellLabel: { fontSize: 10, marginTop: spacing.xs },
+ emptyText: { textAlign: 'center', paddingVertical: spacing.md, fontSize: 13 },
+});
+
+export default RetentionHeatmap;
diff --git a/src/components/analytics/SankeyDiagram.tsx b/src/components/analytics/SankeyDiagram.tsx
new file mode 100644
index 00000000..db87f479
--- /dev/null
+++ b/src/components/analytics/SankeyDiagram.tsx
@@ -0,0 +1,171 @@
+import React, { useMemo } from 'react';
+import { View, Text, StyleSheet, Dimensions } from 'react-native';
+import Svg, { Path, Rect, Text as SvgText } from 'react-native-svg';
+import { spacing } from '../../utils/constants';
+import { useThemeColors } from '../../hooks/useThemeColors';
+import type { PlanMigrationFlow } from '../../types/cohortAnalytics';
+
+const { width: screenWidth } = Dimensions.get('window');
+const CHART_WIDTH = screenWidth - spacing.xl * 2 - spacing.lg * 2;
+const CHART_HEIGHT = 220;
+const NODE_WIDTH = 14;
+
+interface Node {
+ id: string;
+ total: number;
+ y: number;
+ height: number;
+}
+
+function layoutColumn(
+ ids: string[],
+ totals: Map,
+ height: number
+): Map {
+ const grandTotal = ids.reduce((sum, id) => sum + (totals.get(id) ?? 0), 0) || 1;
+ const gap = 6;
+ const usableHeight = height - gap * Math.max(0, ids.length - 1);
+ let cursor = 0;
+ const nodes = new Map();
+ for (const id of ids) {
+ const total = totals.get(id) ?? 0;
+ const nodeHeight = Math.max(8, (total / grandTotal) * usableHeight);
+ nodes.set(id, { id, total, y: cursor, height: nodeHeight });
+ cursor += nodeHeight + gap;
+ }
+ return nodes;
+}
+
+/** Simplified Sankey diagram for plan upgrade/downgrade/lateral migration flows. */
+export const SankeyDiagram: React.FC<{ flows: PlanMigrationFlow[] }> = ({ flows }) => {
+ const colors = useThemeColors();
+
+ const { fromNodes, toNodes } = useMemo(() => {
+ const fromTotals = new Map();
+ const toTotals = new Map();
+ for (const flow of flows) {
+ fromTotals.set(flow.fromPlanId, (fromTotals.get(flow.fromPlanId) ?? 0) + flow.count);
+ toTotals.set(flow.toPlanId, (toTotals.get(flow.toPlanId) ?? 0) + flow.count);
+ }
+ return {
+ fromNodes: layoutColumn(Array.from(fromTotals.keys()), fromTotals, CHART_HEIGHT - 20),
+ toNodes: layoutColumn(Array.from(toTotals.keys()), toTotals, CHART_HEIGHT - 20),
+ };
+ }, [flows]);
+
+ if (flows.length === 0) {
+ return (
+
+ No plan changes recorded for this period yet.
+
+ );
+ }
+
+ const leftX = NODE_WIDTH;
+ const rightX = CHART_WIDTH - NODE_WIDTH;
+ const flowColor = (direction: PlanMigrationFlow['direction']): string =>
+ direction === 'upgrade'
+ ? colors.status.success
+ : direction === 'downgrade'
+ ? colors.status.error
+ : colors.textSecondary;
+
+ // Track running offsets within each node so multiple flows sharing a node stack instead of overlapping.
+ const fromCursor = new Map();
+ const toCursor = new Map();
+
+ return (
+
+
+
+ ● Upgrade
+ ● Downgrade
+ ● Lateral
+
+
+ );
+};
+
+const styles = StyleSheet.create({
+ emptyText: { textAlign: 'center', paddingVertical: spacing.md, fontSize: 13 },
+ legendRow: {
+ flexDirection: 'row',
+ justifyContent: 'center',
+ gap: spacing.md,
+ marginTop: spacing.xs,
+ },
+ legendItem: { fontSize: 11 },
+});
+
+export default SankeyDiagram;
diff --git a/src/navigation/AppNavigator.tsx b/src/navigation/AppNavigator.tsx
index 0f32b2ee..845a8473 100644
--- a/src/navigation/AppNavigator.tsx
+++ b/src/navigation/AppNavigator.tsx
@@ -32,6 +32,7 @@ const SessionManagementScreen = lazyScreen(() => import('../screens/SessionManag
const CalendarIntegrationScreen = lazyScreen(() => import('../screens/CalendarIntegrationScreen'));
const AccountingExportScreen = lazyScreen(() => import('../screens/AccountingExportScreen'));
const WebhookSettingsScreen = lazyScreen(() => import('../screens/WebhookSettingsScreen'));
+const WebhookLogsScreen = lazyScreen(() => import('../screens/WebhookLogsScreen'));
const ErrorDashboardScreen = lazyScreen(() => import('../screens/ErrorDashboardScreen'));
const ImportScreen = lazyScreen(() => import('../screens/ImportScreen'));
const ExportScreen = lazyScreen(() => import('../screens/ExportScreen'));
@@ -275,6 +276,11 @@ const SettingsStack = () => (
component={WebhookSettingsScreen}
options={{ title: 'Webhooks', headerShown: true }}
/>
+
;
+
+const formatDateTime = (timestamp?: number): string =>
+ timestamp ? new Date(timestamp).toLocaleString() : '—';
+
+const WebhookLogsScreen: React.FC = () => {
+ const route = useRoute();
+ const webhookId = route.params?.webhookId;
+ const {
+ webhooks,
+ getWebhookDeliveries,
+ getDeadLetters,
+ replayDeadLetter,
+ rotateSecret,
+ getAnalytics,
+ } = useWebhookStore();
+
+ const [tab, setTab] = useState<'logs' | 'deadLetters'>('logs');
+
+ const webhook = webhooks.find((entry) => entry.id === webhookId);
+ const deliveries = useMemo(
+ () => (webhookId ? getWebhookDeliveries(webhookId, 50).slice().reverse() : []),
+ [webhookId, getWebhookDeliveries]
+ );
+ const deadLetters = useMemo(() => getDeadLetters(webhookId), [webhookId, getDeadLetters]);
+ const analytics = webhookId ? getAnalytics(webhookId) : undefined;
+
+ const handleReplay = (deliveryId: string) => {
+ replayDeadLetter(deliveryId).catch((error) =>
+ Alert.alert(
+ 'Replay failed',
+ error instanceof Error ? error.message : 'Could not replay delivery'
+ )
+ );
+ };
+
+ const handleRotateSecret = () => {
+ if (!webhookId) return;
+ Alert.alert(
+ 'Rotate signing secret',
+ 'The previous secret stays valid for 24h so in-flight receivers keep working.',
+ [
+ { text: 'Cancel', style: 'cancel' },
+ {
+ text: 'Rotate',
+ onPress: () =>
+ rotateSecret(webhookId).catch((error) =>
+ Alert.alert(
+ 'Rotation failed',
+ error instanceof Error ? error.message : 'Could not rotate secret'
+ )
+ ),
+ },
+ ]
+ );
+ };
+
+ if (!webhook) {
+ return (
+
+
+ Webhook not found.
+
+
+ );
+ }
+
+ return (
+
+
+
+ Delivery logs
+ {webhook.url}
+ {webhook.disabledReason ? (
+
+ Auto-disabled: {webhook.disabledReason}
+
+ ) : null}
+
+
+
+
+
+
+ {Math.round((analytics?.successRate ?? 0) * 100)}%
+
+ Success rate
+
+
+ {Math.round(analytics?.avgLatencyMs ?? 0)}ms
+ Avg latency
+
+
+ {deadLetters.length}
+ Dead letters
+
+
+
+ Rotate signing secret
+
+
+
+
+ setTab('logs')}
+ accessibilityRole="tab"
+ accessibilityState={{ selected: tab === 'logs' }}>
+
+ Logs ({deliveries.length})
+
+
+ setTab('deadLetters')}
+ accessibilityRole="tab"
+ accessibilityState={{ selected: tab === 'deadLetters' }}>
+
+ Dead letter queue ({deadLetters.length})
+
+
+
+
+ {tab === 'logs' ? (
+ deliveries.length === 0 ? (
+ No deliveries yet.
+ ) : (
+ deliveries.map((delivery) => (
+
+
+ {delivery.eventType}
+ {webhookStatusLabels[delivery.status]}
+
+
+ {delivery.responseCode ?? '—'} · {delivery.latencyMs ?? '—'}ms · attempt{' '}
+ {delivery.attempts}/{delivery.maxAttempts}
+
+
+ {formatDateTime(delivery.lastAttemptAt ?? delivery.createdAt)}
+
+ {delivery.payloadTruncated ? (
+
+ Payload truncated (>1MB) — hash {delivery.payloadHash?.slice(0, 12)}…
+
+ ) : null}
+ {delivery.errorMessage ? (
+ {delivery.errorMessage}
+ ) : null}
+ {delivery.bodyPreview ? (
+
+ {delivery.bodyPreview}
+
+ ) : null}
+
+ ))
+ )
+ ) : deadLetters.length === 0 ? (
+ No dead-lettered deliveries. 🎉
+ ) : (
+ deadLetters.map((delivery) => (
+
+
+ {delivery.eventType}
+ handleReplay(delivery.id)}>
+ Replay
+
+
+
+ Exhausted after {delivery.attempts}/{delivery.maxAttempts} attempts
+
+
+ Dead-lettered {formatDateTime(delivery.deadLetteredAt)}
+
+ {delivery.errorMessage ? (
+ {delivery.errorMessage}
+ ) : null}
+
+ ))
+ )}
+
+
+ );
+};
+
+const styles = StyleSheet.create({
+ container: { flex: 1, backgroundColor: colors.background },
+ content: { padding: spacing.lg, gap: spacing.md },
+ header: { marginBottom: spacing.sm },
+ title: { ...typography.h1, color: colors.text },
+ subtitle: { color: colors.textSecondary, marginTop: spacing.xs },
+ disabledBanner: {
+ marginTop: spacing.sm,
+ padding: spacing.sm,
+ borderRadius: borderRadius.md,
+ backgroundColor: '#3a1d1d',
+ },
+ disabledBannerText: { color: '#ff8080', fontSize: 12 },
+ summaryCard: { gap: spacing.md },
+ summaryRow: { flexDirection: 'row', justifyContent: 'space-between' },
+ summaryItem: { alignItems: 'center', flex: 1 },
+ summaryValue: { ...typography.h3, color: colors.text },
+ summaryLabel: { color: colors.textSecondary, fontSize: 12, marginTop: spacing.xs },
+ rotateButton: {
+ borderWidth: 1,
+ borderColor: colors.border,
+ borderRadius: borderRadius.md,
+ paddingVertical: spacing.sm,
+ alignItems: 'center',
+ },
+ rotateButtonText: { color: colors.text, fontWeight: '600' },
+ tabRow: { flexDirection: 'row', gap: spacing.sm },
+ tabButton: {
+ flex: 1,
+ paddingVertical: spacing.sm,
+ borderRadius: borderRadius.md,
+ borderWidth: 1,
+ borderColor: colors.border,
+ alignItems: 'center',
+ backgroundColor: colors.surface,
+ },
+ tabButtonActive: { backgroundColor: colors.primary, borderColor: colors.primary },
+ tabButtonText: { color: colors.textSecondary, fontSize: 12 },
+ tabButtonTextActive: { color: colors.text, fontWeight: '600' },
+ logCard: { gap: spacing.xs },
+ rowBetween: { flexDirection: 'row', justifyContent: 'space-between', alignItems: 'center' },
+ logEvent: { color: colors.text, fontWeight: '600' },
+ logStatus: { color: colors.textSecondary, fontSize: 12 },
+ logMeta: { color: colors.textSecondary, fontSize: 12 },
+ truncatedNotice: { color: '#e0a030', fontSize: 12 },
+ errorText: { color: '#ff6b6b', fontSize: 12 },
+ bodyPreview: {
+ color: colors.textSecondary,
+ fontSize: 11,
+ fontFamily: 'monospace',
+ backgroundColor: colors.surface,
+ padding: spacing.xs,
+ borderRadius: borderRadius.sm,
+ },
+ replayButton: {
+ backgroundColor: colors.primary,
+ paddingVertical: spacing.xs,
+ paddingHorizontal: spacing.sm,
+ borderRadius: borderRadius.round,
+ },
+ replayButtonText: { color: colors.text, fontSize: 12, fontWeight: '600' },
+ emptyState: { flex: 1, justifyContent: 'center', alignItems: 'center', padding: spacing.xl },
+ emptyText: { color: colors.textSecondary, textAlign: 'center', paddingVertical: spacing.lg },
+});
+
+export default WebhookLogsScreen;
diff --git a/src/screens/WebhookSettingsScreen.tsx b/src/screens/WebhookSettingsScreen.tsx
index 25ab6a92..6e15117d 100644
--- a/src/screens/WebhookSettingsScreen.tsx
+++ b/src/screens/WebhookSettingsScreen.tsx
@@ -9,6 +9,7 @@ import {
TouchableOpacity,
Alert,
} from 'react-native';
+import { useNavigation } from '@react-navigation/native';
import { Card } from '../components/common/Card';
import { colors, spacing, typography, borderRadius } from '../utils/constants';
import {
@@ -26,6 +27,7 @@ const emptyWebhookForm = {
};
const WebhookSettingsScreen: React.FC = () => {
+ const navigation = useNavigation();
const {
webhooks,
deliveries,
@@ -230,6 +232,11 @@ const WebhookSettingsScreen: React.FC = () => {
onPress={() => sendTestEvent(webhook.id, webhook.events[0])}>
Send test
+ navigation.navigate('WebhookLogs', { webhookId: webhook.id })}>
+ Logs & DLQ
+
onDelete(webhook.id)}>
diff --git a/src/services/cohortPdfExport.ts b/src/services/cohortPdfExport.ts
new file mode 100644
index 00000000..0a375473
--- /dev/null
+++ b/src/services/cohortPdfExport.ts
@@ -0,0 +1,73 @@
+/**
+ * Frontend-safe minimal PDF builder for the cohort report "Export PDF" action.
+ *
+ * Mirrors backend/services/analytics/cohortReportExport.ts's buildSimplePdf,
+ * but works off plain JS string lengths instead of Node's Buffer (React
+ * Native/Hermes has no Buffer global without a polyfill, and this project
+ * doesn't ship one). Safe because every line we emit is plain ASCII, so
+ * string length and byte length are identical.
+ */
+
+import type { CohortBucket } from '../types/cohortAnalytics';
+
+function escapePdfText(text: string): string {
+ return text.replace(/\\/g, '\\\\').replace(/\(/g, '\\(').replace(/\)/g, '\\)');
+}
+
+export function buildSimplePdfText(lines: string[]): string {
+ const fontSize = 11;
+ const leading = 14;
+ const marginTop = 760;
+ const escaped = lines.map(escapePdfText);
+
+ const streamBody = [
+ 'BT',
+ `/F1 ${fontSize} Tf`,
+ `${leading} TL`,
+ `50 ${marginTop} Td`,
+ ...escaped.flatMap((line, index) => (index === 0 ? [`(${line}) Tj`] : ['T*', `(${line}) Tj`])),
+ 'ET',
+ ].join('\n');
+
+ const objects = [
+ '<< /Type /Catalog /Pages 2 0 R >>',
+ '<< /Type /Pages /Kids [3 0 R] /Count 1 >>',
+ '<< /Type /Page /Parent 2 0 R /MediaBox [0 0 612 792] /Resources << /Font << /F1 4 0 R >> >> /Contents 5 0 R >>',
+ '<< /Type /Font /Subtype /Type1 /BaseFont /Helvetica >>',
+ `<< /Length ${streamBody.length} >>\nstream\n${streamBody}\nendstream`,
+ ];
+
+ let pdf = '%PDF-1.4\n';
+ const offsets: number[] = [];
+ objects.forEach((obj, index) => {
+ offsets.push(pdf.length);
+ pdf += `${index + 1} 0 obj\n${obj}\nendobj\n`;
+ });
+
+ const xrefOffset = pdf.length;
+ pdf += `xref\n0 ${objects.length + 1}\n0000000000 65535 f \n`;
+ for (const offset of offsets) {
+ pdf += `${offset.toString().padStart(10, '0')} 00000 n \n`;
+ }
+ pdf += `trailer\n<< /Size ${objects.length + 1} /Root 1 0 R >>\nstartxref\n${xrefOffset}\n%%EOF`;
+
+ return pdf;
+}
+
+export function cohortTableToPdfText(
+ buckets: CohortBucket[],
+ title = 'Cohort Retention Report'
+): string {
+ const lines = [
+ title,
+ `Generated ${new Date().toISOString()}`,
+ '',
+ 'Cohort Size Active Retention Starting MRR Current MRR',
+ ...buckets.map(
+ (bucket) =>
+ `${bucket.cohortKey.padEnd(12)} ${String(bucket.size).padStart(4)} ${String(bucket.activeCount).padStart(6)} ${(bucket.retentionRate * 100).toFixed(1).padStart(8)}% ${bucket.startingMrr.toFixed(2).padStart(12)} ${bucket.currentMrr.toFixed(2).padStart(11)}`
+ ),
+ ];
+ if (buckets.length === 0) lines.push('(no cohort data for this period)');
+ return buildSimplePdfText(lines);
+}
diff --git a/src/store/webhookStore.ts b/src/store/webhookStore.ts
index 5ed7930e..920e34c6 100644
--- a/src/store/webhookStore.ts
+++ b/src/store/webhookStore.ts
@@ -8,6 +8,7 @@ import {
WebhookDeliveryStatus,
WebhookEventType,
WebhookRetryPolicy,
+ WebhookSecret,
} from '../types/webhook';
import { BillingCycle } from '../types/subscription';
import {
@@ -76,18 +77,25 @@ interface WebhookState {
error: string | null;
registerWebhook: (
- input: Omit
+ input: Omit<
+ WebhookConfig,
+ 'id' | 'createdAt' | 'updatedAt' | 'successCount' | 'failureCount' | 'secrets'
+ >
) => Promise;
updateWebhook: (id: string, patch: Partial) => Promise;
deleteWebhook: (id: string) => Promise;
pauseWebhook: (id: string) => Promise;
resumeWebhook: (id: string) => Promise;
+ /** Rotates the signing secret; the old secret stays valid for `overlapMs` (default 24h). */
+ rotateSecret: (id: string, newSecret?: string, overlapMs?: number) => Promise;
recordDelivery: (
delivery: Omit
) => Promise;
retryDelivery: (deliveryId: string) => Promise;
sendTestEvent: (webhookId: string, eventType?: WebhookEventType) => Promise;
getWebhookDeliveries: (webhookId: string, limit?: number) => WebhookDelivery[];
+ getDeadLetters: (webhookId?: string) => WebhookDelivery[];
+ replayDeadLetter: (deliveryId: string) => Promise;
getAnalytics: (webhookId: string) => WebhookAnalytics;
refreshAnalytics: (webhookId?: string) => void;
setWebhookState: (webhooks: WebhookConfig[]) => void;
@@ -103,14 +111,17 @@ export const useWebhookStore = create()(
error: null,
registerWebhook: async (input) => {
+ const createdAt = now();
+ // Ensure every webhook has a signing secret so deliveries are
+ // verifiable; generate one if the caller did not supply it.
+ const secretKey = input.secretKey?.trim() ? input.secretKey : generateWebhookSecret();
const webhook: WebhookConfig = {
...input,
- // Ensure every webhook has a signing secret so deliveries are
- // verifiable; generate one if the caller did not supply it.
- secretKey: input.secretKey?.trim() ? input.secretKey : generateWebhookSecret(),
+ secretKey,
+ secrets: [{ key: secretKey, createdAt, validFrom: createdAt }],
id: createId('whk'),
- createdAt: now(),
- updatedAt: now(),
+ createdAt,
+ updatedAt: createdAt,
successCount: 0,
failureCount: 0,
};
@@ -157,7 +168,24 @@ export const useWebhookStore = create()(
pauseWebhook: async (id) => get().updateWebhook(id, { isPaused: true }),
- resumeWebhook: async (id) => get().updateWebhook(id, { isPaused: false }),
+ resumeWebhook: async (id) =>
+ get().updateWebhook(id, { isPaused: false, disabledReason: undefined }),
+
+ rotateSecret: async (id, newSecret, overlapMs = 24 * 60 * 60 * 1_000) => {
+ const current = get().webhooks.find((webhook) => webhook.id === id);
+ if (!current) throw new Error(`Webhook ${id} not found`);
+
+ const rotatedAt = now();
+ const nextSecret = newSecret?.trim() ? newSecret : generateWebhookSecret();
+ const secrets: WebhookSecret[] = (current.secrets ?? []).map((secret) =>
+ secret.validUntil === undefined
+ ? { ...secret, validUntil: rotatedAt + overlapMs }
+ : secret
+ );
+ secrets.push({ key: nextSecret, createdAt: rotatedAt, validFrom: rotatedAt });
+
+ return get().updateWebhook(id, { secretKey: nextSecret, secrets });
+ },
recordDelivery: async (delivery) => {
const record: WebhookDelivery = {
@@ -273,6 +301,46 @@ export const useWebhookStore = create()(
.deliveries.filter((delivery) => delivery.webhookId === webhookId)
.slice(-Math.max(0, limit)),
+ getDeadLetters: (webhookId) =>
+ get()
+ .deliveries.filter(
+ (delivery) =>
+ delivery.isDeadLettered && (!webhookId || delivery.webhookId === webhookId)
+ )
+ .sort((a, b) => (a.deadLetteredAt ?? 0) - (b.deadLetteredAt ?? 0)),
+
+ replayDeadLetter: async (deliveryId) => {
+ const current = get().deliveries.find((delivery) => delivery.id === deliveryId);
+ if (!current) throw new Error(`Delivery ${deliveryId} not found`);
+
+ const next: WebhookDelivery = {
+ ...current,
+ status: 'delivered',
+ attempts: current.attempts + 1,
+ lastAttemptAt: now(),
+ deliveredAt: now(),
+ responseCode: 200,
+ errorMessage: undefined,
+ isDeadLettered: false,
+ deadLetteredAt: undefined,
+ updatedAt: now(),
+ };
+
+ set((state) => {
+ const nextDeliveries = state.deliveries.map((delivery) =>
+ delivery.id === deliveryId ? next : delivery
+ );
+ return {
+ deliveries: nextDeliveries,
+ analytics: {
+ ...state.analytics,
+ [next.webhookId]: calculateAnalytics(next.webhookId, nextDeliveries),
+ },
+ };
+ });
+ return next;
+ },
+
getAnalytics: (webhookId) => {
const analytics = calculateAnalytics(webhookId, get().deliveries);
set((state) => ({
diff --git a/src/types/cohortAnalytics.ts b/src/types/cohortAnalytics.ts
new file mode 100644
index 00000000..7e4cd613
--- /dev/null
+++ b/src/types/cohortAnalytics.ts
@@ -0,0 +1,99 @@
+/**
+ * Shared types for the advanced subscription analytics suite (cohort retention,
+ * revenue vs. logo churn, plan migration, LTV by acquisition source).
+ *
+ * Decoupled from the personal `Subscription` model — analytics operate on
+ * merchant-side subscriber lifecycle records, mirroring the shape already used
+ * by the webhook subsystem (see src/types/webhook.ts WebhookSubscriptionSnapshot).
+ */
+
+export type CohortGranularity = 'week' | 'month';
+
+export interface PlanChangeEvent {
+ fromPlanId: string;
+ toPlanId: string;
+ changedAt: number;
+}
+
+export interface SubscriberRecord {
+ subscriberId: string;
+ merchantId: string;
+ planId: string;
+ planName: string;
+ region?: string;
+ acquisitionChannel?: string;
+ /** Epoch ms the subscriber's first subscription started. */
+ signupAt: number;
+ /** Epoch ms the subscriber churned; undefined if still active. */
+ churnedAt?: number;
+ /** Current monthly recurring revenue contribution. */
+ mrr: number;
+ /** Epoch ms of the subscriber's last observed activity, used for activity-based retention. */
+ lastActiveAt?: number;
+ planHistory?: PlanChangeEvent[];
+}
+
+export interface CohortBucket {
+ /** e.g. "2026-W07" or "2026-06" depending on granularity. */
+ cohortKey: string;
+ granularity: CohortGranularity;
+ periodStart: number;
+ periodEnd: number;
+ size: number;
+ activeCount: number;
+ retentionRate: number;
+ startingMrr: number;
+ currentMrr: number;
+ isEmpty: boolean;
+}
+
+export interface RetentionCurvePoint {
+ day: 1 | 7 | 30 | 60 | 90;
+ retainedCount: number;
+ cohortSize: number;
+ retentionRate: number;
+}
+
+export interface ChurnBreakdown {
+ periodStart: number;
+ periodEnd: number;
+ startingSubscribers: number;
+ churnedSubscribers: number;
+ logoChurnRate: number;
+ startingMrr: number;
+ churnedMrr: number;
+ revenueChurnRate: number;
+ isEmpty: boolean;
+}
+
+export interface PlanMigrationFlow {
+ fromPlanId: string;
+ toPlanId: string;
+ count: number;
+ direction: 'upgrade' | 'downgrade' | 'lateral';
+}
+
+export interface LtvSourceBreakdown {
+ acquisitionChannel: string;
+ subscriberCount: number;
+ avgLifetimeMonths: number;
+ avgMonthlyRevenue: number;
+ ltv: number;
+}
+
+export interface AnomalyFlaggedPoint {
+ label: string;
+ value: number;
+ isAnomaly: boolean;
+}
+
+export interface ChurnRiskSummary {
+ cohortKey: string;
+ sampledSubscribers: number;
+ highRiskCount: number;
+ mediumRiskCount: number;
+ lowRiskCount: number;
+ averageChurnProbability: number;
+}
+
+export type AnalyticsExportFormat = 'csv' | 'pdf';
diff --git a/src/types/webhook.ts b/src/types/webhook.ts
index a09431ed..c26e3f5c 100644
--- a/src/types/webhook.ts
+++ b/src/types/webhook.ts
@@ -21,6 +21,30 @@ export interface WebhookRetryPolicy {
initialDelayMs: number;
maxDelayMs: number;
backoffFactor?: number;
+ /**
+ * Fixed retry schedule in ms, indexed by attempt number (1-based).
+ * When present, takes precedence over the initialDelayMs/backoffFactor formula.
+ * The last entry is reused for any attempt beyond the array length.
+ */
+ retryDelaysMs?: number[];
+}
+
+/** A signing secret valid for a bounded window, used for zero-downtime rotation. */
+export interface WebhookSecret {
+ key: string;
+ createdAt: number;
+ validFrom: number;
+ /** Undefined means "still valid" (current secret). */
+ validUntil?: number;
+}
+
+export interface WebhookRateLimitConfig {
+ /** Max deliveries allowed in a short burst window. */
+ burstLimit: number;
+ /** Burst window size in ms (defaults to 1s if omitted). */
+ burstWindowMs?: number;
+ /** Steady-state cap per rolling 60s window. */
+ steadyPerMinute: number;
}
export interface WebhookConfig {
@@ -29,9 +53,15 @@ export interface WebhookConfig {
url: string;
events: WebhookEventType[];
secretKey: string;
+ /** Signing secret history; the most recently added entry is the active signing secret. */
+ secrets: WebhookSecret[];
retryPolicy: WebhookRetryPolicy;
+ /** @deprecated use rateLimit.steadyPerMinute instead. Kept for backward compatibility. */
rateLimitPerMinute?: number;
+ rateLimit?: WebhookRateLimitConfig;
isPaused: boolean;
+ /** Set when the webhook was auto-disabled (e.g. endpoint returned 410 Gone). */
+ disabledReason?: string;
createdAt: number;
updatedAt: number;
lastHealthCheckAt?: number;
@@ -77,6 +107,8 @@ export interface WebhookEventInput {
previousStatus: string;
currentStatus: string;
occurredAt?: number;
+ /** Client-supplied Idempotency-Key header value; defaults to `${eventId}:${webhookId}` when omitted. */
+ idempotencyKey?: string;
}
export interface WebhookEventPayload {
@@ -120,6 +152,15 @@ export interface WebhookDelivery {
signature: string;
idempotencyKey: string;
latencyMs?: number;
+ /** Truncated preview of the request body sent, for delivery logs/receipts. */
+ bodyPreview?: string;
+ /** True when the payload exceeded the 1MB limit and was truncated before sending. */
+ payloadTruncated?: boolean;
+ /** SHA-256 hash of the full (untruncated) payload, present when payloadTruncated is true. */
+ payloadHash?: string;
+ /** True once this delivery has exhausted retries and landed in the dead-letter queue. */
+ isDeadLettered?: boolean;
+ deadLetteredAt?: number;
}
export interface WebhookAnalytics {