Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions contract/contracts/hello-world/src/autoshare_logic.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::base::errors::Error;
use crate::base::events::{
AdminTransferred, AuthorizationFailure, AutoshareCreated, AutoshareUpdated,
ContractPaused, ContractUnpaused, GroupActivated, GroupDeactivated,
NotificationCategory, Withdrawal,
AdminTransferred, AuthorizationFailure, AutoshareCreated, AutoshareUpdated, ContractPaused,
ContractUnpaused, GroupActivated, GroupDeactivated, NotificationCategory, Withdrawal,
};
use crate::base::types::{AutoShareDetails, GroupMember, PaymentHistory};
use soroban_sdk::{contracttype, token, Address, BytesN, Env, String, Vec};
Expand Down
4 changes: 4 additions & 0 deletions listener/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Logging Configuration
LOG_LEVEL=info
# NODE_ENV=production # Uncomment for newline-delimited JSON log output

# Stellar Network Configuration
STELLAR_NETWORK=testnet
STELLAR_RPC_URL=https://soroban-testnet.stellar.org:443
Expand Down
18 changes: 18 additions & 0 deletions listener/LOGGING.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ The following messages mark each stage of the notification pipeline:
| API request done | `info` | `GET /api/events complete` |
| Reconnect attempt | `warn` | `Attempting to reconnect` |
| Max retries exceeded | `error` | `Max reconnection attempts exceeded, stopping service`|
| Scheduler batch start | `info` | `Processing batch of scheduled notifications` |
| Scheduler batch done | `info` | `Scheduler batch complete` |
| Scheduled notification | `info` | `Processing scheduled notification` |
| Registry at capacity | `warn` | `Event registry at capacity, evicting oldest events` |

## Error Formatting

Errors passed in the `error` metadata field are automatically normalized into a structured object with `message`, `name`, `stack`, and optional `cause` fields. Use the exported `formatError()` helper when formatting errors outside the logger.

```typescript
import logger, { formatError } from '../utils/logger';

try {
await deliverNotification();
} catch (error) {
logger.error('Delivery failed', { requestId, error });
}
```

## Configuration

Expand Down
27 changes: 15 additions & 12 deletions listener/src/examples/schedule-notification-example.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/**
* Example: How to schedule notifications
*
*
* This file demonstrates various ways to schedule notifications
* for future delivery using the NotificationAPI
*/
Expand All @@ -9,6 +9,7 @@ import { NotificationAPI } from '../services/notification-api';
import { ScheduledNotificationRepository } from '../services/scheduled-notification-repository';
import { initializeDatabase } from '../database/database';
import { NotificationType } from '../types/scheduled-notification';
import logger from '../utils/logger';

async function examples() {
// Initialize database and API
Expand Down Expand Up @@ -37,7 +38,7 @@ async function examples() {
oneHourLater
);

console.log(`Scheduled notification ID: ${notification1}`);
logger.info('Scheduled notification created', { id: notification1 });

// ====================================================================
// Example 2: Schedule a high-priority notification for tomorrow 9 AM
Expand Down Expand Up @@ -69,7 +70,7 @@ async function examples() {
},
});

console.log(`High-priority notification scheduled for ${tomorrow9AM}`);
logger.info('High-priority notification scheduled', { executeAt: tomorrow9AM });

// ====================================================================
// Example 3: Schedule multiple notifications (batch scheduling)
Expand All @@ -89,21 +90,21 @@ async function examples() {
scheduleIds.push(id);
}

console.log(`Scheduled ${scheduleIds.length} reminder notifications`);
logger.info('Batch reminder notifications scheduled', { count: scheduleIds.length });

// ====================================================================
// Example 4: Cancel a scheduled notification
// ====================================================================
const cancelled = await api.cancelNotification(notification1);
console.log(`Notification ${notification1} cancelled: ${cancelled}`);
logger.info('Notification cancellation result', { id: notification1, cancelled });

// ====================================================================
// Example 5: Check notification status
// ====================================================================
const notification = await api.getNotification(notification2);

if (notification) {
console.log('Notification details:', {
logger.info('Notification details', {
id: notification.id,
status: notification.status,
executeAt: notification.executeAt,
Expand All @@ -116,8 +117,7 @@ async function examples() {
// Example 6: Get scheduler statistics
// ====================================================================
const stats = await api.getStatistics();
console.log('Scheduler statistics:', stats);
// Output: { pending: 10, processing: 2, completed: 100, failed: 5, overdue: 1 }
logger.info('Scheduler statistics', stats);

// ====================================================================
// Example 7: Schedule notification based on blockchain event
Expand Down Expand Up @@ -154,7 +154,10 @@ async function examples() {
};

const scheduledEventNotification = await scheduleEventNotification(mockEvent, 30);
console.log(`Event notification scheduled for 30 minutes: ${scheduledEventNotification}`);
logger.info('Event notification scheduled', {
id: scheduledEventNotification,
delayMinutes: 30,
});

// ====================================================================
// Example 8: Schedule with custom retry configuration
Expand All @@ -172,7 +175,7 @@ async function examples() {
},
});

console.log(`Critical notification scheduled: ${criticalNotification}`);
logger.info('Critical notification scheduled', { id: criticalNotification });

// Clean up
await db.close();
Expand All @@ -182,11 +185,11 @@ async function examples() {
if (require.main === module) {
examples()
.then(() => {
console.log('Examples completed successfully');
logger.info('Examples completed successfully');
process.exit(0);
})
.catch((error) => {
console.error('Examples failed:', error);
logger.error('Examples failed', { error });
process.exit(1);
});
}
Expand Down
4 changes: 0 additions & 4 deletions listener/src/services/discord-notification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ export class DiscordNotificationService {
}

this.deduplicator.markSent(fingerprint);
logger.info('Discord notification sent successfully', {
eventId: event.id,
contractAddress: contractConfig.address,
});
logger.info('Discord notification delivered', {
...logContext,
durationMs,
Expand Down
12 changes: 8 additions & 4 deletions listener/src/services/notification-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ export class NotificationAPI {
/**
* Schedule a notification for future delivery
*/
async scheduleNotification(input: CreateScheduledNotificationInput): Promise<number> {
async scheduleNotification(
input: CreateScheduledNotificationInput,
requestId?: string
): Promise<number> {
// Validate input
if (!input.executeAt || input.executeAt < new Date()) {
throw new Error('executeAt must be a future date');
Expand All @@ -27,12 +30,13 @@ export class NotificationAPI {
}

logger.info('Scheduling new notification', {
requestId,
type: input.notificationType,
executeAt: input.executeAt,
recipient: input.targetRecipient,
});

return await this.repository.create(input);
return await this.repository.create(input, requestId);
}

/**
Expand Down Expand Up @@ -62,8 +66,8 @@ export class NotificationAPI {
/**
* Cancel a scheduled notification
*/
async cancelNotification(id: number): Promise<boolean> {
logger.info('Cancelling scheduled notification', { id });
async cancelNotification(id: number, requestId?: string): Promise<boolean> {
logger.info('Cancelling scheduled notification', { requestId, id });
return await this.repository.cancel(id);
}

Expand Down
64 changes: 49 additions & 15 deletions listener/src/services/notification-scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { v4 as uuidv4 } from 'uuid';
import logger from '../utils/logger';
import { generateRequestId } from '../utils/request-id';
import { ScheduledNotificationRepository } from './scheduled-notification-repository';
import { SchedulerConfig, NotificationStatus, ScheduledNotification } from '../types/scheduled-notification';
import { DiscordNotificationService } from './discord-notification';
Expand Down Expand Up @@ -92,44 +93,71 @@ export class NotificationScheduler {
* Main processing loop
*/
private async processPendingNotifications(): Promise<void> {
const requestId = generateRequestId();
const batchStart = Date.now();

try {
// Recover any stale locks from crashed processors
await this.repository.recoverStaleLocks();
await this.repository.recoverStaleLocks(requestId);

// Fetch and lock pending notifications
const notifications = await this.repository.fetchAndLockPendingNotifications(
this.processorId,
this.config.lockTimeoutMs,
this.config.batchSize
this.config.batchSize,
requestId
);

if (notifications.length === 0) {
logger.debug('Scheduler poll cycle complete', {
requestId,
processorId: this.processorId,
count: 0,
durationMs: Date.now() - batchStart,
});
return;
}

logger.info('Processing batch of scheduled notifications', {
requestId,
count: notifications.length,
processorId: this.processorId,
});

// Process each notification
for (const notification of notifications) {
await this.processNotification(notification);
await this.processNotification(notification, requestId);
}

logger.info('Scheduler batch complete', {
requestId,
processorId: this.processorId,
count: notifications.length,
durationMs: Date.now() - batchStart,
});
} catch (error) {
logger.error('Error in scheduler processing loop', { error, processorId: this.processorId });
logger.error('Error in scheduler processing loop', {
requestId,
error,
processorId: this.processorId,
durationMs: Date.now() - batchStart,
});
}
}

/**
* Process a single notification
*/
private async processNotification(notification: ScheduledNotification): Promise<void> {
private async processNotification(
notification: ScheduledNotification,
requestId: string
): Promise<void> {
const startTime = Date.now();
const executionAttempt = notification.retryCount + 1;

try {
logger.info('Processing scheduled notification', {
requestId,
id: notification.id,
type: notification.notificationType,
executeAt: notification.executeAt,
Expand All @@ -143,6 +171,7 @@ export class NotificationScheduler {
if (timeDiff < -this.config.timingBufferMs) {
// Notification is not yet due (clock skew or early fetch)
logger.warn('Notification not yet due, releasing lock', {
requestId,
id: notification.id,
executeAt: notification.executeAt,
now,
Expand All @@ -157,35 +186,37 @@ export class NotificationScheduler {
}

// Execute notification based on type
const success = await this.executeNotification(notification);
const success = await this.executeNotification(notification, requestId);

const duration = Date.now() - startTime;
const durationMs = Date.now() - startTime;

if (success) {
await this.repository.markAsCompleted(notification.id!);
await this.repository.markAsCompleted(notification.id!, requestId);
await this.repository.logExecution({
scheduledNotificationId: notification.id!,
executionAttempt,
executionTime: new Date(),
status: 'SUCCESS',
durationMs: duration,
durationMs,
});

logger.info('Notification delivered successfully', {
requestId,
id: notification.id,
type: notification.notificationType,
duration,
durationMs,
});
} else {
throw new Error('Notification delivery returned false');
}
} catch (error) {
const duration = Date.now() - startTime;
const durationMs = Date.now() - startTime;
logger.error('Failed to process notification', {
requestId,
id: notification.id,
error,
attempt: executionAttempt,
duration,
durationMs,
});

await this.repository.markAsFailedOrRetry(
Expand All @@ -201,15 +232,18 @@ export class NotificationScheduler {
executionTime: new Date(),
status: notification.retryCount >= notification.maxRetries ? 'FAILED' : 'RETRY',
errorMessage: (error as Error).message,
durationMs: duration,
durationMs,
});
}
}

/**
* Execute notification delivery based on type
*/
private async executeNotification(notification: ScheduledNotification): Promise<boolean> {
private async executeNotification(
notification: ScheduledNotification,
requestId: string
): Promise<boolean> {
const payload = JSON.parse(notification.payload);

switch (notification.notificationType) {
Expand All @@ -220,7 +254,7 @@ export class NotificationScheduler {
return await this.discordService.sendEventNotification(
payload.event,
payload.contractConfig,
`scheduler-${notification.id}`
`scheduler-${notification.id}-${requestId}`
);

case 'webhook':
Expand Down
Loading
Loading