Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 66 additions & 1 deletion lib/cache/redisCache.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import Redis from "ioredis";
import type { RawEvent, TranslatedEvent } from "../translator/types";

let client: Redis | null = null;
const CACHE_NAMESPACE = "open-audit";
const EVENTS_CACHE_PREFIX = `${CACHE_NAMESPACE}:events`;
const TRANSLATION_CACHE_PREFIX = `${CACHE_NAMESPACE}:translation`;

export function isRedisEnabled(): boolean {
return Boolean(process.env.REDIS_URL);
Expand All @@ -17,7 +21,11 @@ export function initRedis(): void {

function makeKey(sorobanUrl: string, contractIds: string[], startLedger: number) {
const ids = contractIds.join(",");
return `open-audit:events:${sorobanUrl}:${ids}:${startLedger}`;
return `${EVENTS_CACHE_PREFIX}:${sorobanUrl}:${ids}:${startLedger}`;
}

function makeTranslationKey(txHash: string, eventId: string) {
return `${TRANSLATION_CACHE_PREFIX}:${txHash}:${eventId}`;
}

export async function getCachedEvents(
Expand Down Expand Up @@ -64,6 +72,63 @@ export async function setCachedEvents(
}
}

export async function getCachedTranslation(
event: Pick<RawEvent, "txHash" | "id">
): Promise<TranslatedEvent | null> {
if (!isRedisEnabled()) return null;
try {
if (!client) initRedis();
if (!client) return null;
const key = makeTranslationKey(event.txHash, event.id);
const raw = await client.get(key);
if (!raw) return null;
return JSON.parse(raw) as TranslatedEvent;
} catch (err) {
console.warn("[redis] Error reading translation cache:", err);
return null;
}
}

export async function setCachedTranslation(
event: Pick<RawEvent, "txHash" | "id">,
translated: TranslatedEvent
): Promise<void> {
if (!isRedisEnabled()) return;
try {
if (!client) initRedis();
if (!client) return;
const key = makeTranslationKey(event.txHash, event.id);
await client.set(key, JSON.stringify(translated));
} catch (err) {
console.warn("[redis] Error writing translation cache:", err);
}
}

export async function purgeTranslationCache(
matchPattern: string = `${TRANSLATION_CACHE_PREFIX}:*`
): Promise<number> {
if (!isRedisEnabled()) return 0;
try {
if (!client) initRedis();
if (!client) return 0;

const keys: string[] = [];
let cursor = "0";
do {
const [nextCursor, batch] = await client.scan(cursor, "MATCH", matchPattern, "COUNT", "100");
cursor = nextCursor;
keys.push(...batch);
} while (cursor !== "0");

if (keys.length === 0) return 0;
await client.del(...keys);
return keys.length;
} catch (err) {
console.warn("[redis] Error purging translation cache:", err);
return 0;
}
}

export async function disconnectRedis(): Promise<void> {
if (client) {
await client.quit();
Expand Down
118 changes: 45 additions & 73 deletions lib/graphql/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,46 @@ import {
GraphQLBoolean,
GraphQLList,
GraphQLNonNull,
GraphQLFieldConfigMap,
} from "graphql";
import { db } from "../db/client";
import {
getRegisteredContracts,
registerBlueprint,
translateEvent,
translateWithCache,
} from "../translator/registry";
import {
decodeAddress,
decodeAmount,
truncateHex,
} from "../translator/core";
import type { CustomAbi } from "../translator/types";
import {
CustomAbi,
customAbiToBlueprint,
parseCustomAbi,
} from "../translator/custom-abi";
import { purgeTranslationCache } from "../cache/redisCache";

// In-memory server-side registry of custom ABIs
export const SERVER_CUSTOM_ABIS = new Map<string, CustomAbi>();

function buildPersistedRawEvent(event: any) {
return {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
description: event.description ?? undefined,
status: event.status ?? undefined,
blueprintName: event.blueprintName ?? undefined,
eventType: event.eventType ?? undefined,
schemaVersion: event.schemaVersion ?? undefined,
};
}

// Cache for the generated schema
let cachedSchema: GraphQLSchema | null = null;
let schemaVersion = 0;
Expand Down Expand Up @@ -180,7 +199,7 @@ export function buildSchema(): GraphQLSchema {
if (cachedSchema) return cachedSchema;

const customEventTypes = buildCustomEventTypes(SERVER_CUSTOM_ABIS);
const queryFields: Record<string, any> = {
const queryFields: GraphQLFieldConfigMap<any, any> = {
// 1. Generic queries
events: {
type: new GraphQLNonNull(new GraphQLList(new GraphQLNonNull(EventType))),
Expand Down Expand Up @@ -252,16 +271,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === "Transfer" && rawEvent.topics.length >= 3) {
results.push({
id: rawEvent.id,
Expand Down Expand Up @@ -297,16 +308,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === "Mint" && rawEvent.topics.length >= 3) {
results.push({
id: rawEvent.id,
Expand Down Expand Up @@ -342,16 +345,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === "Burn" && rawEvent.topics.length >= 2) {
results.push({
id: rawEvent.id,
Expand Down Expand Up @@ -400,16 +395,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === capitalize(eventDef.name)) {
const positions = [...rawEvent.topics.slice(1), rawEvent.data];
const resolvedFields: Record<string, any> = {
Expand Down Expand Up @@ -459,16 +446,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === "Transfer" && rawEvent.topics.length >= 3) {
results.push({
id: rawEvent.id,
Expand Down Expand Up @@ -504,16 +483,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === "Mint" && rawEvent.topics.length >= 3) {
results.push({
id: rawEvent.id,
Expand Down Expand Up @@ -549,16 +520,8 @@ export function buildSchema(): GraphQLSchema {
});
const results: any[] = [];
for (const event of events) {
const rawEvent = {
id: event.id,
contractId: event.contractId,
ledger: event.ledger,
timestamp: event.timestamp,
txHash: event.txHash,
topics: event.topics as string[],
data: event.data,
};
const translated = translateEvent(rawEvent);
const rawEvent = buildPersistedRawEvent(event);
const translated = await translateWithCache(rawEvent);
if (translated.eventType === "Burn" && rawEvent.topics.length >= 2) {
results.push({
id: rawEvent.id,
Expand Down Expand Up @@ -614,6 +577,15 @@ export function buildSchema(): GraphQLSchema {
};
},
},
invalidateTranslationCache: {
type: new GraphQLNonNull(GraphQLInt),
args: {
pattern: { type: GraphQLString },
},
resolve: async (_, args) => {
return await purgeTranslationCache(args.pattern ?? undefined);
},
},
},
});

Expand Down
17 changes: 11 additions & 6 deletions lib/translator/persistence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
*/

import type { RawEvent, TranslatedEvent } from "./types";
import { translateEvent } from "./registry";
import { db } from "@/lib/db/client";
import { processEventForIpfs } from "@/lib/ipfs/offloader";
import { triggerWebhooksForEvent } from "@/lib/jobs/queue";
import { OpenAuditError } from "@/lib/errors";
import { translateWithCache } from "./registry";
import { db } from "../db/client";
import { processEventForIpfs } from "../ipfs/offloader";
import { triggerWebhooksForEvent } from "../jobs/queue";
import { OpenAuditError } from "../errors";
import { setCachedTranslation, isRedisEnabled } from "../cache/redisCache";

interface DeadLetterPayload {
errorCode: string;
Expand Down Expand Up @@ -52,7 +53,7 @@ export async function translateAndPersistEvent(
let translated: TranslatedEvent;

try {
translated = await translateEvent(rawEvent);
translated = await translateWithCache(rawEvent);
} catch (error) {
const errorCode = error instanceof OpenAuditError ? error.code : "INTERNAL_ERROR";
const errorMessage = error instanceof Error ? error.message : String(error);
Expand Down Expand Up @@ -114,6 +115,10 @@ export async function translateAndPersistEvent(
translated.raw.data = processed.data;
translated.raw.topics = processed.topics;

if (isRedisEnabled()) {
await setCachedTranslation(rawEvent, translated);
}

return translated;
} catch (error) {
console.error(`Failed to persist event ${rawEvent.id}:`, error);
Expand Down
Loading
Loading