Skip to content

Commit c4eef67

Browse files
committed
apply pr feedback
consolidate ensdb modules; replace pino logger with native console logger; simplify ensdb worker logic
1 parent 623260f commit c4eef67

File tree

13 files changed

+267
-316
lines changed

13 files changed

+267
-316
lines changed

apps/ensapi/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"date-fns": "catalog:",
4444
"drizzle-orm": "catalog:",
4545
"hono": "catalog:",
46-
"p-memoize": "^8.0.0",
46+
"p-memoize": "catalog:",
4747
"p-retry": "catalog:",
4848
"pg-connection-string": "catalog:",
4949
"pino": "catalog:",

apps/ensindexer/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@
2929
"@ensnode/ensnode-sdk": "workspace:*",
3030
"@ensnode/ensrainbow-sdk": "workspace:*",
3131
"@ensnode/ponder-metadata": "workspace:*",
32+
"@ponder/client": "catalog:",
3233
"caip": "catalog:",
3334
"date-fns": "catalog:",
3435
"deepmerge-ts": "^7.1.5",
3536
"dns-packet": "^5.6.1",
3637
"drizzle-orm": "catalog:",
38+
"p-memoize": "catalog:",
3739
"p-retry": "catalog:",
38-
"pino": "catalog:",
3940
"pg-connection-string": "catalog:",
40-
"pg": "8.16.3",
4141
"hono": "catalog:",
4242
"ponder": "catalog:",
4343
"viem": "catalog:",

apps/ensindexer/ponder/src/ensdb-writer-worker.ts

Lines changed: 30 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
* - Indexing Status
55
* into the ENSDb.
66
*/
7-
import config from "@/config";
8-
97
import { secondsToMilliseconds } from "date-fns";
108
import pRetry from "p-retry";
119

@@ -18,14 +16,11 @@ import {
1816
} from "@ensnode/ensnode-sdk";
1917

2018
import { validateENSIndexerPublicConfigCompatibility } from "@/config/compatibility";
21-
import { EnsDbConnection, EnsDbMutation, EnsDbQuery } from "@/lib/ensdb";
19+
import { EnsDbClient } from "@/lib/ensdb";
2220
import { ensIndexerClient, waitForEnsIndexerToBecomeHealthy } from "@/lib/ensindexer";
23-
import { makeLogger } from "@/lib/logger";
2421

2522
const INDEXING_STATUS_RECORD_UPDATE_INTERVAL: Duration = 1;
2623

27-
const logger = makeLogger("ensdb-writer-worker");
28-
2924
/**
3025
* ENSDb Writer Worker
3126
*
@@ -38,57 +33,45 @@ const logger = makeLogger("ensdb-writer-worker");
3833
* into ENSDb.
3934
*/
4035
async function ensDbWriterWorker() {
36+
console.log("ENSDb Writer Worker: waiting for ENSIndexer to become healthy.");
37+
4138
// 0. Wait for ENSIndexer to become healthy before running the worker's logic
4239
await waitForEnsIndexerToBecomeHealthy;
4340

44-
// 1. Create ENSDb Client
45-
const ensDbConnection = new EnsDbConnection();
46-
const ensDbClient = ensDbConnection.connect({
47-
schemaName: config.databaseSchemaName,
48-
poolConfig: {
49-
connectionString: config.databaseUrl,
50-
},
51-
});
52-
53-
logger.info("ENSDb Client connected");
41+
console.log("ENSDb Writer Worker: ENSIndexer is healthy, starting tasks.");
5442

55-
// 2. Create ENSDb Query object for read operations
56-
const ensDbQuery = new EnsDbQuery(ensDbClient);
57-
// 3. Create ENSDb Mutation object for write operations
58-
const ensDbMutation = new EnsDbMutation(ensDbClient);
43+
// 1. Create ENSDb Client
44+
const ensDbClient = new EnsDbClient();
5945

6046
/**
6147
* Handle ENSIndexerPublicConfig Record
6248
*/
6349
const handleEnsIndexerPublicConfigRecord = async () => {
6450
// Read stored config and in-memory config.
65-
// Note: we wrap read operations in pRetry to ensure all of them are
51+
// Note: we wrap each operation in pRetry to ensure all of them can be
6652
// completed successfully.
67-
const [storedConfig, inMemoryConfig] = await pRetry(() =>
68-
Promise.all([ensDbQuery.getEnsIndexerPublicConfig(), ensIndexerClient.config()]),
69-
);
53+
const [storedConfig, inMemoryConfig] = await Promise.all([
54+
pRetry(() => ensDbClient.getEnsIndexerPublicConfig()),
55+
pRetry(() => ensIndexerClient.config()),
56+
]);
7057

7158
// Validate in-memory config object compatibility with the stored one,
7259
// if the stored one is available
7360
if (storedConfig) {
7461
try {
7562
validateENSIndexerPublicConfigCompatibility(storedConfig, inMemoryConfig);
7663
} catch (error) {
77-
const errorMessage =
78-
"In-memory ENSIndexerPublicConfig object is not compatible with its counterpart stored in ENSDb.";
79-
80-
logger.error(error, errorMessage);
64+
const errorMessage = `In-memory ENSIndexerPublicConfig object is not compatible with its counterpart stored in ENSDb.`;
8165

8266
// Throw the error to terminate the ENSIndexer process due to
8367
// found config incompatibility
84-
throw new Error(errorMessage);
68+
throw new Error(errorMessage, {
69+
cause: error,
70+
});
8571
}
8672
} else {
8773
// Upsert ENSIndexerPublicConfig into ENSDb.
88-
// Note: we wrap write operation in pRetry to ensure it can complete
89-
// successfully, as there will be no other attempt.
90-
await pRetry(() => ensDbMutation.upsertEnsIndexerPublicConfig(inMemoryConfig));
91-
logger.info("ENSIndexer Public Config successfully stored in ENSDb.");
74+
await ensDbClient.upsertEnsIndexerPublicConfig(inMemoryConfig);
9275
}
9376
};
9477

@@ -110,16 +93,14 @@ async function ensDbWriterWorker() {
11093

11194
// Check if Indexing Status is in expected status.
11295
if (omnichainSnapshot.omnichainStatus === OmnichainIndexingStatusIds.Unstarted) {
113-
throw new Error("Omnichain Status must be different that 'Unstarted'.");
96+
throw new Error("Omnichain Status must be different than 'Unstarted'.");
11497
}
11598

11699
// Upsert ENSIndexerPublicConfig into ENSDb.
117-
await ensDbMutation.upsertIndexingStatus(snapshot);
118-
119-
logger.info("Indexing Status successfully stored in ENSDb.");
100+
await ensDbClient.upsertIndexingStatus(snapshot);
120101
} catch (error) {
121102
// Do nothing about this error, but having it logged.
122-
logger.error(error, "Could not upsert Indexing Status record");
103+
console.error(error, "Could not upsert Indexing Status record");
123104
} finally {
124105
// Regardless of current iteration result,
125106
// schedule the next callback to handle Indexing Status Record.
@@ -131,12 +112,20 @@ async function ensDbWriterWorker() {
131112
};
132113

133114
// 4. Handle ENSIndexer Public Config just once.
134-
await handleEnsIndexerPublicConfigRecord();
115+
console.log("Task: store ENSIndexer Public Config in ENSDb.");
116+
await handleEnsIndexerPublicConfigRecord().then(() =>
117+
console.log("ENSIndexer Public Config successfully stored in ENSDb."),
118+
);
135119

136120
// 5. Handle Indexing Status on recurring basis.
137-
await handleIndexingStatusRecordRecursively();
121+
console.log("Task: store Indexing Status in ENSDb.");
122+
await handleIndexingStatusRecordRecursively().then(() =>
123+
console.log("Indexing Status successfully stored in ENSDb."),
124+
);
138125
}
139126

140127
// Run ENSDb Writer Worker in a non-blocking way to
141128
// allow database migrations to proceed in the background.
142-
setTimeout(ensDbWriterWorker, 0);
129+
ensDbWriterWorker().catch((error) =>
130+
console.error("ENSDb Writer Worker failed to perform its tasks", error),
131+
);

apps/ensindexer/src/config/compatibility.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@ export type ENSIndexerPublicConfigCompatibilityCheck = Pick<
99
* Validate if `configB` is compatible with `configA`, such that `configA` is
1010
* a subset of `configB`.
1111
*
12-
* @throws error if 'indexedChainIds' were incompatible.
13-
* @throws error if 'isSubgraphCompatible' flag was incompatible.
14-
* @throws error if 'namespace' was incompatible.
15-
* @throws error if 'plugins' were incompatible.
12+
* @throws error if configs are incompatible.
1613
*/
1714
export function validateENSIndexerPublicConfigCompatibility(
1815
configA: ENSIndexerPublicConfigCompatibilityCheck,
Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,25 @@
11
// This file was copied 1-to-1 from ENSApi.
2+
// TODO: deduplicate with apps/ensapi/src/lib/handlers/drizzle.ts when ensnode nodejs internal package is created
23

3-
import { isTable, Table } from "drizzle-orm";
4-
import { drizzle, type NodePgDatabase } from "drizzle-orm/node-postgres";
5-
import { isPgEnum } from "drizzle-orm/pg-core";
6-
import type { Pool } from "pg";
4+
import { setDatabaseSchema } from "@ponder/client";
5+
import { drizzle } from "drizzle-orm/node-postgres";
76

87
type Schema = { [name: string]: unknown };
98

10-
// https://github.com/ponder-sh/ponder/blob/f7f6444ab8d1a870fe6492023941091df7b7cddf/packages/client/src/index.ts#L226C1-L239C3
11-
const setDatabaseSchema = <T extends Schema>(schema: T, schemaName: string) => {
12-
for (const table of Object.values(schema)) {
13-
if (isTable(table)) {
14-
// @ts-expect-error
15-
table[Table.Symbol.Schema] = schemaName;
16-
} else if (isPgEnum(table)) {
17-
// @ts-expect-error
18-
table.schema = schemaName;
19-
}
20-
}
21-
};
22-
239
/**
2410
* Makes a Drizzle DB object.
2511
*/
2612
export const makeDrizzle = <SCHEMA extends Schema>({
2713
schema,
28-
connectionPool,
14+
databaseUrl,
2915
databaseSchema,
3016
}: {
3117
schema: SCHEMA;
32-
connectionPool: Pool;
18+
databaseUrl: string;
3319
databaseSchema: string;
34-
}): NodePgDatabase<SCHEMA> & {
35-
$client: Pool;
36-
} => {
20+
}) => {
3721
// monkeypatch schema onto tables
3822
setDatabaseSchema(schema, databaseSchema);
3923

40-
return drizzle(connectionPool, { schema, casing: "snake_case" });
24+
return drizzle(databaseUrl, { schema, casing: "snake_case" });
4125
};
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import config from "@/config";
2+
3+
import { eq } from "drizzle-orm/sql";
4+
5+
import * as schema from "@ensnode/ensnode-schema";
6+
import type {
7+
SerializedCrossChainIndexingStatusSnapshot,
8+
SerializedENSIndexerPublicConfig,
9+
} from "@ensnode/ensnode-sdk";
10+
11+
import { makeDrizzle } from "./drizzle";
12+
import {
13+
type EnsNodeMetadata,
14+
type EnsNodeMetadataEnsIndexerPublicConfig,
15+
type EnsNodeMetadataIndexingStatus,
16+
EnsNodeMetadataKeys,
17+
} from "./ensnode-metadata";
18+
19+
export interface EnsDbClientQuery {
20+
getEnsIndexerPublicConfig(): Promise<SerializedENSIndexerPublicConfig | undefined>;
21+
22+
getIndexingStatus(): Promise<SerializedCrossChainIndexingStatusSnapshot | undefined>;
23+
}
24+
25+
export interface EnsDbClientMutation {
26+
upsertEnsIndexerPublicConfig(
27+
ensIndexerPublicConfig: SerializedENSIndexerPublicConfig,
28+
): Promise<SerializedENSIndexerPublicConfig>;
29+
30+
upsertIndexingStatus(
31+
indexingStatus: SerializedCrossChainIndexingStatusSnapshot,
32+
): Promise<SerializedCrossChainIndexingStatusSnapshot>;
33+
}
34+
35+
/**
36+
* ENSDb Client
37+
*/
38+
export class EnsDbClient implements EnsDbClientQuery, EnsDbClientMutation {
39+
#db = makeDrizzle({
40+
databaseSchema: config.databaseSchemaName,
41+
databaseUrl: config.databaseUrl,
42+
schema,
43+
});
44+
45+
/**
46+
* Upsert ENSIndexer Public Config
47+
*
48+
* @returns updated record in ENSDb.
49+
* @throws when upsert operation failed.
50+
*/
51+
async getEnsIndexerPublicConfig(): Promise<SerializedENSIndexerPublicConfig | undefined> {
52+
return this.getEnsNodeMetadata<EnsNodeMetadataEnsIndexerPublicConfig>({
53+
key: EnsNodeMetadataKeys.EnsIndexerPublicConfig,
54+
});
55+
}
56+
57+
/**
58+
* Upsert Indexing Status
59+
*
60+
* @returns updated record in ENSDb.
61+
* @throws when upsert operation failed.
62+
*/
63+
async getIndexingStatus(): Promise<SerializedCrossChainIndexingStatusSnapshot | undefined> {
64+
return this.getEnsNodeMetadata<EnsNodeMetadataIndexingStatus>({
65+
key: EnsNodeMetadataKeys.IndexingStatus,
66+
});
67+
}
68+
69+
/**
70+
* Upsert ENSIndexer Public Config
71+
*
72+
* @returns updated record in ENSDb.
73+
* @throws when upsert operation failed.
74+
*/
75+
async upsertEnsIndexerPublicConfig(
76+
ensIndexerPublicConfig: SerializedENSIndexerPublicConfig,
77+
): Promise<SerializedENSIndexerPublicConfig> {
78+
return this.upsertEnsNodeMetadata({
79+
key: EnsNodeMetadataKeys.EnsIndexerPublicConfig,
80+
value: ensIndexerPublicConfig,
81+
});
82+
}
83+
84+
/**
85+
* Upsert Indexing Status
86+
*
87+
* @returns updated record in ENSDb.
88+
* @throws when upsert operation failed.
89+
*/
90+
async upsertIndexingStatus(
91+
indexingStatus: SerializedCrossChainIndexingStatusSnapshot,
92+
): Promise<SerializedCrossChainIndexingStatusSnapshot> {
93+
return this.upsertEnsNodeMetadata({
94+
key: EnsNodeMetadataKeys.IndexingStatus,
95+
value: indexingStatus,
96+
});
97+
}
98+
99+
/**
100+
* Get ENSNode metadata record
101+
*
102+
* @returns selected record in ENSDb.
103+
* @throws when exactly one matching metadata record was not found
104+
*/
105+
private async getEnsNodeMetadata<EnsNodeMetadataType extends EnsNodeMetadata = EnsNodeMetadata>(
106+
metadata: Pick<EnsNodeMetadataType, "key">,
107+
): Promise<EnsNodeMetadataType["value"] | undefined> {
108+
const result = await this.#db
109+
.select()
110+
.from(schema.ensNodeMetadata)
111+
.where(eq(schema.ensNodeMetadata.key, metadata.key));
112+
113+
if (result.length === 0) {
114+
return undefined;
115+
}
116+
117+
if (result.length === 1 && result[0]) {
118+
return result[0].value as EnsNodeMetadataType["value"];
119+
}
120+
121+
throw new Error(`There must be exactly one ENSNodeMetadata record for '${metadata.key}' key`);
122+
}
123+
124+
/**
125+
* Upsert ENSNode metadata
126+
*
127+
* @returns updated record in ENSDb.
128+
* @throws when upsert operation failed.
129+
*/
130+
private async upsertEnsNodeMetadata<
131+
EnsNodeMetadataType extends EnsNodeMetadata = EnsNodeMetadata,
132+
>(metadata: EnsNodeMetadataType): Promise<EnsNodeMetadataType["value"]> {
133+
const [result] = await this.#db
134+
.insert(schema.ensNodeMetadata)
135+
.values({
136+
key: metadata.key,
137+
value: metadata.value,
138+
})
139+
.onConflictDoUpdate({
140+
target: schema.ensNodeMetadata.key,
141+
set: { value: metadata.value },
142+
})
143+
.returning({ value: schema.ensNodeMetadata.value });
144+
145+
if (!result) {
146+
throw new Error(`Failed to upsert metadata for key: ${metadata.key}`);
147+
}
148+
149+
return result.value as EnsNodeMetadataType["value"];
150+
}
151+
}

0 commit comments

Comments
 (0)