From 902ced47c628a1b3f913fa4f84301566e84fbc8b Mon Sep 17 00:00:00 2001 From: squad0011 Date: Wed, 24 Jun 2026 01:37:18 +0000 Subject: [PATCH] Add Redis translation cache-aside support and GraphQL invalidation mutation --- lib/cache/redisCache.ts | 67 ++++++++++++++++++- lib/graphql/schema.ts | 118 +++++++++++++--------------------- lib/translator/persistence.ts | 17 +++-- lib/translator/registry.ts | 72 +++++++++++++++++++-- 4 files changed, 187 insertions(+), 87 deletions(-) diff --git a/lib/cache/redisCache.ts b/lib/cache/redisCache.ts index d014152..43db3a0 100644 --- a/lib/cache/redisCache.ts +++ b/lib/cache/redisCache.ts @@ -1,6 +1,10 @@ import Redis from "ioredis"; +import type { RawEvent, TranslatedEvent } from "../translator/types"; let client: Redis | null = null; +const CACHE_NAMESPACE = "open-audit"; +const EVENTS_CACHE_PREFIX = `${CACHE_NAMESPACE}:events`; +const TRANSLATION_CACHE_PREFIX = `${CACHE_NAMESPACE}:translation`; export function isRedisEnabled(): boolean { return Boolean(process.env.REDIS_URL); @@ -17,7 +21,11 @@ export function initRedis(): void { function makeKey(sorobanUrl: string, contractIds: string[], startLedger: number) { const ids = contractIds.join(","); - return `open-audit:events:${sorobanUrl}:${ids}:${startLedger}`; + return `${EVENTS_CACHE_PREFIX}:${sorobanUrl}:${ids}:${startLedger}`; +} + +function makeTranslationKey(txHash: string, eventId: string) { + return `${TRANSLATION_CACHE_PREFIX}:${txHash}:${eventId}`; } export async function getCachedEvents( @@ -64,6 +72,63 @@ export async function setCachedEvents( } } +export async function getCachedTranslation( + event: Pick +): Promise { + if (!isRedisEnabled()) return null; + try { + if (!client) initRedis(); + if (!client) return null; + const key = makeTranslationKey(event.txHash, event.id); + const raw = await client.get(key); + if (!raw) return null; + return JSON.parse(raw) as TranslatedEvent; + } catch (err) { + console.warn("[redis] Error reading translation cache:", err); + return null; + } +} + +export async function setCachedTranslation( + event: Pick, + translated: TranslatedEvent +): Promise { + if (!isRedisEnabled()) return; + try { + if (!client) initRedis(); + if (!client) return; + const key = makeTranslationKey(event.txHash, event.id); + await client.set(key, JSON.stringify(translated)); + } catch (err) { + console.warn("[redis] Error writing translation cache:", err); + } +} + +export async function purgeTranslationCache( + matchPattern: string = `${TRANSLATION_CACHE_PREFIX}:*` +): Promise { + if (!isRedisEnabled()) return 0; + try { + if (!client) initRedis(); + if (!client) return 0; + + const keys: string[] = []; + let cursor = "0"; + do { + const [nextCursor, batch] = await client.scan(cursor, "MATCH", matchPattern, "COUNT", "100"); + cursor = nextCursor; + keys.push(...batch); + } while (cursor !== "0"); + + if (keys.length === 0) return 0; + await client.del(...keys); + return keys.length; + } catch (err) { + console.warn("[redis] Error purging translation cache:", err); + return 0; + } +} + export async function disconnectRedis(): Promise { if (client) { await client.quit(); diff --git a/lib/graphql/schema.ts b/lib/graphql/schema.ts index 6a9530b..8b4ba2a 100644 --- a/lib/graphql/schema.ts +++ b/lib/graphql/schema.ts @@ -6,27 +6,46 @@ import { GraphQLBoolean, GraphQLList, GraphQLNonNull, + GraphQLFieldConfigMap, } from "graphql"; import { db } from "../db/client"; import { getRegisteredContracts, registerBlueprint, - translateEvent, + translateWithCache, } from "../translator/registry"; import { decodeAddress, decodeAmount, truncateHex, } from "../translator/core"; +import type { CustomAbi } from "../translator/types"; import { - CustomAbi, customAbiToBlueprint, parseCustomAbi, } from "../translator/custom-abi"; +import { purgeTranslationCache } from "../cache/redisCache"; // In-memory server-side registry of custom ABIs export const SERVER_CUSTOM_ABIS = new Map(); +function buildPersistedRawEvent(event: any) { + return { + id: event.id, + contractId: event.contractId, + ledger: event.ledger, + timestamp: event.timestamp, + txHash: event.txHash, + topics: event.topics as string[], + data: event.data, + description: event.description ?? undefined, + status: event.status ?? undefined, + blueprintName: event.blueprintName ?? undefined, + eventType: event.eventType ?? undefined, + schemaVersion: event.schemaVersion ?? undefined, + }; +} + // Cache for the generated schema let cachedSchema: GraphQLSchema | null = null; let schemaVersion = 0; @@ -180,7 +199,7 @@ export function buildSchema(): GraphQLSchema { if (cachedSchema) return cachedSchema; const customEventTypes = buildCustomEventTypes(SERVER_CUSTOM_ABIS); - const queryFields: Record = { + const queryFields: GraphQLFieldConfigMap = { // 1. Generic queries events: { type: new GraphQLNonNull(new GraphQLList(new GraphQLNonNull(EventType))), @@ -252,16 +271,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === "Transfer" && rawEvent.topics.length >= 3) { results.push({ id: rawEvent.id, @@ -297,16 +308,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === "Mint" && rawEvent.topics.length >= 3) { results.push({ id: rawEvent.id, @@ -342,16 +345,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === "Burn" && rawEvent.topics.length >= 2) { results.push({ id: rawEvent.id, @@ -400,16 +395,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === capitalize(eventDef.name)) { const positions = [...rawEvent.topics.slice(1), rawEvent.data]; const resolvedFields: Record = { @@ -459,16 +446,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === "Transfer" && rawEvent.topics.length >= 3) { results.push({ id: rawEvent.id, @@ -504,16 +483,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === "Mint" && rawEvent.topics.length >= 3) { results.push({ id: rawEvent.id, @@ -549,16 +520,8 @@ export function buildSchema(): GraphQLSchema { }); const results: any[] = []; for (const event of events) { - const rawEvent = { - id: event.id, - contractId: event.contractId, - ledger: event.ledger, - timestamp: event.timestamp, - txHash: event.txHash, - topics: event.topics as string[], - data: event.data, - }; - const translated = translateEvent(rawEvent); + const rawEvent = buildPersistedRawEvent(event); + const translated = await translateWithCache(rawEvent); if (translated.eventType === "Burn" && rawEvent.topics.length >= 2) { results.push({ id: rawEvent.id, @@ -614,6 +577,15 @@ export function buildSchema(): GraphQLSchema { }; }, }, + invalidateTranslationCache: { + type: new GraphQLNonNull(GraphQLInt), + args: { + pattern: { type: GraphQLString }, + }, + resolve: async (_, args) => { + return await purgeTranslationCache(args.pattern ?? undefined); + }, + }, }, }); diff --git a/lib/translator/persistence.ts b/lib/translator/persistence.ts index 60a700e..0eff8b3 100644 --- a/lib/translator/persistence.ts +++ b/lib/translator/persistence.ts @@ -8,11 +8,12 @@ */ import type { RawEvent, TranslatedEvent } from "./types"; -import { translateEvent } from "./registry"; -import { db } from "@/lib/db/client"; -import { processEventForIpfs } from "@/lib/ipfs/offloader"; -import { triggerWebhooksForEvent } from "@/lib/jobs/queue"; -import { OpenAuditError } from "@/lib/errors"; +import { translateWithCache } from "./registry"; +import { db } from "../db/client"; +import { processEventForIpfs } from "../ipfs/offloader"; +import { triggerWebhooksForEvent } from "../jobs/queue"; +import { OpenAuditError } from "../errors"; +import { setCachedTranslation, isRedisEnabled } from "../cache/redisCache"; interface DeadLetterPayload { errorCode: string; @@ -52,7 +53,7 @@ export async function translateAndPersistEvent( let translated: TranslatedEvent; try { - translated = await translateEvent(rawEvent); + translated = await translateWithCache(rawEvent); } catch (error) { const errorCode = error instanceof OpenAuditError ? error.code : "INTERNAL_ERROR"; const errorMessage = error instanceof Error ? error.message : String(error); @@ -114,6 +115,10 @@ export async function translateAndPersistEvent( translated.raw.data = processed.data; translated.raw.topics = processed.topics; + if (isRedisEnabled()) { + await setCachedTranslation(rawEvent, translated); + } + return translated; } catch (error) { console.error(`Failed to persist event ${rawEvent.id}:`, error); diff --git a/lib/translator/registry.ts b/lib/translator/registry.ts index 97afe55..7884d2f 100644 --- a/lib/translator/registry.ts +++ b/lib/translator/registry.ts @@ -24,11 +24,13 @@ import { sanitizeTextField } from "./core"; import { decodeGenericEventPayload, formatGenericValue } from "./generic-fallback-decoder"; import { RegistryTemplateException } from "../errors"; import { captureExceptionSync } from "../telemetry"; +import { getCachedTranslation, setCachedTranslation, isRedisEnabled } from "../cache/redisCache"; import type { EventMatchCriteria, RawEvent, TranslatedEvent, TranslationBlueprint, + VersionedTranslationBlueprint, Language, } from "./types"; @@ -36,7 +38,52 @@ import type { * The registry maps contract IDs to an array of versioned blueprints, * sorted descending by validFromLedger so the newest schema is tried first. */ -type BlueprintRegistry = Map; +type BlueprintRegistry = Map; + +export type PersistedRawEvent = RawEvent & Partial>; + +function hasPersistedTranslation(event: PersistedRawEvent): boolean { + return ( + event.status !== undefined || + event.description !== undefined || + event.blueprintName !== undefined || + event.eventType !== undefined || + event.schemaVersion !== undefined + ); +} + +function buildTranslationFromPersisted(event: PersistedRawEvent): TranslatedEvent { + return { + raw: event, + description: event.description ?? null, + status: event.status ?? "cryptic", + blueprintName: event.blueprintName ?? null, + eventType: event.eventType ?? null, + schemaVersion: event.schemaVersion ?? null, + }; +} + +export async function translateWithCache( + event: PersistedRawEvent, + customBlueprints?: Map, + lang: Language = "en" +): Promise { + if (event.txHash && event.id && isRedisEnabled()) { + const cached = await getCachedTranslation(event); + if (cached) return cached; + } + + const translated = + hasPersistedTranslation(event) && event.status !== undefined + ? buildTranslationFromPersisted(event) + : translateEvent(event, customBlueprints, lang); + + if (event.txHash && event.id && isRedisEnabled()) { + await setCachedTranslation(event, translated); + } + + return translated; +} /** * Builds the global blueprint registry by collecting all known blueprints. @@ -64,8 +111,8 @@ function buildRegistry(): BlueprintRegistry { const mintBurnBlueprint = createSacMintBurnBlueprint(contractId); const existing = registry.get(contractId); if (existing) { - // Merge by creating a combined translate function - const originalTranslate = existing.translate; + const existingBlueprint = Array.isArray(existing) ? existing[0] : existing; + const originalTranslate = existingBlueprint.translate.bind(existingBlueprint); registry.set(contractId, { ...mintBurnBlueprint, translate: (event, lang) => originalTranslate(event, lang) ?? mintBurnBlueprint.translate(event, lang), @@ -162,6 +209,7 @@ export function translateEvent( status: "cryptic", blueprintName: Array.isArray(entry) ? entry[0].contractName : entry.contractName, eventType: null, + schemaVersion: null, }; } @@ -174,6 +222,7 @@ export function translateEvent( status: "cryptic", blueprintName: blueprint.contractName ? sanitizeTextField(blueprint.contractName, { maxLength: 100 }) : null, eventType: null, + schemaVersion: null, }; } @@ -327,13 +376,22 @@ export function getBlueprintCount(): number { * Call this to add or upgrade a contract's translation schemas without * rebuilding the singleton. The blueprint list is re-sorted after insertion. */ -export function registerBlueprint(...blueprints: VersionedTranslationBlueprint[]): void { +export function registerBlueprint(...blueprints: TranslationBlueprint[]): void { for (const blueprint of blueprints) { - const existing = REGISTRY.get(blueprint.contractId) ?? []; - existing.push(blueprint); + const existing = REGISTRY.get(blueprint.contractId); + if (!existing) { + REGISTRY.set(blueprint.contractId, blueprint); + continue; + } + + const merged: VersionedTranslationBlueprint[] = Array.isArray(existing) + ? [...existing] + : [{ ...existing } as VersionedTranslationBlueprint]; + + merged.push(blueprint as VersionedTranslationBlueprint); REGISTRY.set( blueprint.contractId, - existing.sort((a, b) => (b.validFromLedger ?? 0) - (a.validFromLedger ?? 0)) + merged.sort((a, b) => (b.validFromLedger ?? 0) - (a.validFromLedger ?? 0)) ); } } \ No newline at end of file