Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backend/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,8 @@ SERVER_SECRET_KEY=your-stellar-secret-key-here

# Server Configuration
PORT=3000

# Reconciliation Configuration
RECONCILE_ENABLED=true
RECONCILE_INTERVAL_MS=60000
RECONCILE_WINDOW=200
13 changes: 13 additions & 0 deletions backend/src/config/env.validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
IsEnum,
IsNotEmpty,
IsNumber,
IsOptional,
IsString,
MinLength,
validateSync,
Expand Down Expand Up @@ -52,6 +53,18 @@ class EnvironmentVariables {

@IsNumber()
WEBHOOK_BATCH_SIZE: number = 50;

@IsOptional()
@IsString()
RECONCILE_ENABLED?: string;

@IsOptional()
@IsNumber()
RECONCILE_INTERVAL_MS?: number;

@IsOptional()
@IsNumber()
RECONCILE_WINDOW?: number;
}

export function validate(config: Record<string, unknown>) {
Expand Down
9 changes: 8 additions & 1 deletion backend/src/creator-events/creator-events.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ import { CreatorEventsService } from './creator-events.service';
@Module({
imports: [
ContractModule,
TypeOrmModule.forFeature([CreatorEvent, Match, MatchPrediction, User, CreatorEventLeaderboardEntry, CreatorEventPayout]),
TypeOrmModule.forFeature([
CreatorEvent,
Match,
MatchPrediction,
User,
CreatorEventLeaderboardEntry,
CreatorEventPayout,
]),
CacheModule.register(),
],
controllers: [
Expand Down
17 changes: 17 additions & 0 deletions backend/src/indexer/dto/indexer-health.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ export class IndexerHealthResponseDto {
alerts: IndexerAlertDto[];
}

export class ReconciliationStatusDto {
@ApiProperty()
enabled: boolean;

@ApiProperty()
is_running: boolean;

@ApiProperty({ nullable: true })
last_run_at: string | null;

@ApiProperty()
last_backfill_count: number;

@ApiProperty()
lag_in_ledgers: number;
}

export class IndexerDashboardDto extends IndexerHealthResponseDto {
@ApiProperty()
events_per_second: number;
Expand Down
53 changes: 53 additions & 0 deletions backend/src/indexer/entities/chain-sync-checkpoint.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import {
Entity,
PrimaryColumn,
Column,
CreateDateColumn,
UpdateDateColumn,
} from 'typeorm';
import { ApiProperty } from '@nestjs/swagger';

@Entity('chain_sync_checkpoints')
export class ChainSyncCheckpoint {
@PrimaryColumn({ type: 'varchar', length: 128 })
@ApiProperty({ description: 'Contract ID this checkpoint tracks' })
contract_id: string;

@Column({ type: 'bigint', default: 0 })
@ApiProperty({
description: 'Last ledger successfully indexed and persisted',
})
last_indexed_ledger: number;

@Column({ type: 'bigint', default: 0 })
@ApiProperty({ description: 'Latest known chain-head ledger' })
chain_head_ledger: number;

@Column({ type: 'bigint', default: 0 })
@ApiProperty({
description: 'Ledger from which the last reconciliation started',
})
last_reconciled_from: number;

@Column({ type: 'bigint', default: 0 })
@ApiProperty({ description: 'Ledger at which the last reconciliation ended' })
last_reconciled_to: number;

@Column({ type: 'timestamptz', nullable: true })
@ApiProperty({
description: 'Timestamp of the last successful reconciliation',
})
last_reconciled_at: Date | null;

@Column({ type: 'int', default: 0 })
@ApiProperty({
description: 'Number of events backfilled in the last reconciliation run',
})
last_backfill_count: number;

@CreateDateColumn()
created_at: Date;

@UpdateDateColumn()
updated_at: Date;
}
29 changes: 28 additions & 1 deletion backend/src/indexer/indexer-health.controller.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { Test, TestingModule } from '@nestjs/testing';
import { ConfigService } from '@nestjs/config';
import { IndexerHealthController } from './indexer-health.controller';
import { IndexerHealthService } from './health.service';
import { ReconciliationService } from './reconciliation.service';

describe('IndexerHealthController', () => {
let controller: IndexerHealthController;
Expand All @@ -13,6 +15,9 @@ describe('IndexerHealthController', () => {
| 'triggerManualSync'
>
>;
let reconciliationService: jest.Mocked<
Pick<ReconciliationService, 'getStatus' | 'getCheckpointForContract'>
>;

beforeEach(async () => {
healthService = {
Expand All @@ -22,9 +27,31 @@ describe('IndexerHealthController', () => {
triggerManualSync: jest.fn(),
};

reconciliationService = {
getStatus: jest.fn().mockReturnValue({
enabled: true,
is_running: false,
last_run_at: null,
last_backfill_count: 0,
}),
getCheckpointForContract: jest.fn().mockResolvedValue(null),
};

const module: TestingModule = await Test.createTestingModule({
controllers: [IndexerHealthController],
providers: [{ provide: IndexerHealthService, useValue: healthService }],
providers: [
{ provide: IndexerHealthService, useValue: healthService },
{ provide: ReconciliationService, useValue: reconciliationService },
{
provide: ConfigService,
useValue: {
get: jest.fn((key: string) => {
if (key === 'SOROBAN_CONTRACT_ID') return 'contract-123';
return undefined;
}),
},
},
],
}).compile();

controller = module.get<IndexerHealthController>(IndexerHealthController);
Expand Down
40 changes: 39 additions & 1 deletion backend/src/indexer/indexer-health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Controller, Get, Header, Post, UseGuards } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import {
ApiBearerAuth,
ApiOperation,
Expand All @@ -12,15 +13,21 @@ import { Role } from '../common/enums/role.enum';
import { JwtAuthGuard } from '../common/guards/jwt-auth.guard';
import { RolesGuard } from '../common/guards/roles.guard';
import { IndexerHealthService } from './health.service';
import { ReconciliationService } from './reconciliation.service';
import {
IndexerDashboardDto,
IndexerHealthResponseDto,
ReconciliationStatusDto,
} from './dto/indexer-health.dto';

@ApiTags('Indexer')
@Controller('indexer')
export class IndexerHealthController {
constructor(private readonly healthService: IndexerHealthService) {}
constructor(
private readonly healthService: IndexerHealthService,
private readonly reconciliationService: ReconciliationService,
private readonly configService: ConfigService,
) {}

/**
* GET /api/indexer/health
Expand Down Expand Up @@ -71,6 +78,37 @@ export class IndexerHealthController {
return this.healthService.getPrometheusMetrics();
}

@Get('health/reconciliation')
@Public()
@ApiOperation({ summary: 'Get reconciliation status and lag' })
@ApiResponse({
status: 200,
description: 'Reconciliation status with non-negative lag',
type: ReconciliationStatusDto,
})
async getReconciliationStatus(): Promise<ReconciliationStatusDto> {
const status = this.reconciliationService.getStatus();
const contractId = this.configService.get<string>('SOROBAN_CONTRACT_ID');
let lag = 0;

if (contractId && contractId !== 'your-contract-id-here') {
const checkpoint =
await this.reconciliationService.getCheckpointForContract(contractId);
if (checkpoint) {
lag = Math.max(
0,
Number(checkpoint.chain_head_ledger) -
Number(checkpoint.last_indexed_ledger),
);
}
}

return {
...status,
lag_in_ledgers: lag,
};
}

/**
* POST /api/indexer/health/sync
* #722 — Manually trigger an indexer sync (admin only).
Expand Down
7 changes: 5 additions & 2 deletions backend/src/indexer/indexer.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import { CacheModule } from '@nestjs/cache-manager';
import { ContractEvent } from './entities/contract-event.entity';
import { FeeHistory } from './entities/fee-history.entity';
import { IndexerCheckpoint } from './entities/indexer-checkpoint.entity';
import { ChainSyncCheckpoint } from './entities/chain-sync-checkpoint.entity';
import { IndexerService } from './indexer.service';
import { IndexerController } from './indexer.controller';
import { IndexerHealthController } from './indexer-health.controller';
import { IndexerHealthService } from './health.service';
import { ReconciliationService } from './reconciliation.service';
import { CreatorEvent } from '../matches/entities/creator-event.entity';
import { CreatorEventLeaderboardEntry } from '../matches/entities/creator-event-leaderboard-entry.entity';
import { CreatorEventPayout } from '../matches/entities/creator-event-payout.entity';
Expand All @@ -23,6 +25,7 @@ import { WebsocketModule } from '../websocket/websocket.module';
ContractEvent,
FeeHistory,
IndexerCheckpoint,
ChainSyncCheckpoint,
CreatorEvent,
CreatorEventLeaderboardEntry,
CreatorEventPayout,
Expand All @@ -35,7 +38,7 @@ import { WebsocketModule } from '../websocket/websocket.module';
WebsocketModule,
],
controllers: [IndexerController, IndexerHealthController],
providers: [IndexerService, IndexerHealthService],
exports: [IndexerService, IndexerHealthService],
providers: [IndexerService, IndexerHealthService, ReconciliationService],
exports: [IndexerService, IndexerHealthService, ReconciliationService],
})
export class IndexerModule {}
7 changes: 7 additions & 0 deletions backend/src/indexer/indexer.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import { User } from '../users/entities/user.entity';
import { NotificationGeneratorService } from '../notifications/notification-generator.service';
import { BroadcasterService } from '../websocket/broadcaster.service';
import { ReconciliationService } from './reconciliation.service';

describe('IndexerService', () => {
let service: IndexerService;
Expand Down Expand Up @@ -184,6 +185,12 @@
broadcastEventFinalized: jest.fn(),
},
},
{
provide: ReconciliationService,
useValue: {
advanceCheckpoint: jest.fn(),
},
},
],
}).compile();

Expand Down Expand Up @@ -258,7 +265,7 @@
(event: unknown) => event as CreatorEvent,
);
(creatorEventRepository.save as jest.Mock).mockImplementation(
async (event: unknown) => event as CreatorEvent,

Check warning on line 268 in backend/src/indexer/indexer.service.spec.ts

View workflow job for this annotation

GitHub Actions / Lint

Async arrow function has no 'await' expression
);
});

Expand All @@ -285,7 +292,7 @@
});
const fetchMock = jest.spyOn(global, 'fetch').mockResolvedValue({
ok: true,
json: async () => ({ result: { events: [], latestLedger: 100 } }),

Check warning on line 295 in backend/src/indexer/indexer.service.spec.ts

View workflow job for this annotation

GitHub Actions / Lint

Async method 'json' has no 'await' expression
} as unknown as Response);

try {
Expand Down
31 changes: 26 additions & 5 deletions backend/src/indexer/indexer.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
import { User } from '../users/entities/user.entity';
import { NotificationGeneratorService } from '../notifications/notification-generator.service';
import { BroadcasterService } from '../websocket/broadcaster.service';
import { ReconciliationService } from './reconciliation.service';

const CHECKPOINT_LEDGER_KEY = 'indexer:last_processed_ledger';
const CHECKPOINT_LEDGER_KEY_LATEST = 'indexer:latest_contract_ledger';
Expand Down Expand Up @@ -73,6 +74,7 @@ export class IndexerService implements OnModuleInit {

private readonly notificationGeneratorService: NotificationGeneratorService,
private readonly broadcasterService: BroadcasterService,
private readonly reconciliationService: ReconciliationService,
) {}

async onModuleInit(): Promise<void> {
Expand Down Expand Up @@ -131,6 +133,15 @@ export class IndexerService implements OnModuleInit {
if (events.length === 0) {
if (latestLedger > lastLedger) {
await this.saveCheckpoint(CHECKPOINT_LEDGER_KEY, latestLedger);
const activeContractId = this.configService.get<string>(
'SOROBAN_CONTRACT_ID',
);
if (activeContractId) {
await this.reconciliationService.advanceCheckpoint(
activeContractId,
latestLedger,
);
}
}
return;
}
Expand All @@ -155,10 +166,18 @@ export class IndexerService implements OnModuleInit {
}
}

await this.saveCheckpoint(
CHECKPOINT_LEDGER_KEY,
Math.max(maxProcessedLedger, latestLedger),
const finalLedger = Math.max(maxProcessedLedger, latestLedger);
await this.saveCheckpoint(CHECKPOINT_LEDGER_KEY, finalLedger);

const activeContractId = this.configService.get<string>(
'SOROBAN_CONTRACT_ID',
);
if (activeContractId) {
await this.reconciliationService.advanceCheckpoint(
activeContractId,
finalLedger,
);
}

const elapsed = Date.now() - batchStart;
this.processingRate =
Expand Down Expand Up @@ -782,11 +801,13 @@ export class IndexerService implements OnModuleInit {
if (existing) return;

const predictedHomeScore =
data.predicted_home_score !== undefined && data.predicted_home_score !== null
data.predicted_home_score !== undefined &&
data.predicted_home_score !== null
? Number(data.predicted_home_score)
: null;
const predictedAwayScore =
data.predicted_away_score !== undefined && data.predicted_away_score !== null
data.predicted_away_score !== undefined &&
data.predicted_away_score !== null
? Number(data.predicted_away_score)
: null;

Expand Down
Loading
Loading