Skip to content

Commit dfb8796

Browse files
introduce jsonb to record
1 parent 66e035c commit dfb8796

File tree

4 files changed

+321
-29
lines changed

4 files changed

+321
-29
lines changed

flow/connectors/postgres/client.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,27 @@ const (
5454
getTableNameToUnchangedToastColsSQL = `SELECT _peerdb_destination_table_name,
5555
ARRAY_AGG(DISTINCT _peerdb_unchanged_toast_columns) FROM %s.%s WHERE
5656
_peerdb_batch_id=$1 AND _peerdb_record_type!=2 GROUP BY _peerdb_destination_table_name`
57+
mergeStatementSQLJsonbToRecord = `WITH src_rank AS (
58+
SELECT r.*,_peerdb_record_type,_peerdb_unchanged_toast_columns, _peerdb_timestamp,
59+
RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank
60+
FROM %s.%s, jsonb_to_record(_peerdb_data) AS r(%s)
61+
WHERE _peerdb_batch_id = $1 AND _peerdb_destination_table_name = $2
62+
)
63+
MERGE INTO %s dst
64+
USING (SELECT %s,_peerdb_record_type,_peerdb_unchanged_toast_columns
65+
FROM src_rank WHERE _peerdb_rank=1 ORDER BY _peerdb_timestamp) src
66+
ON %s
67+
WHEN NOT MATCHED AND src._peerdb_record_type!=2 THEN
68+
INSERT (%s) VALUES (%s) %s
69+
WHEN MATCHED AND src._peerdb_record_type=2 THEN %s`
5770
mergeStatementSQL = `WITH src_rank AS (
58-
SELECT _peerdb_data,_peerdb_record_type,_peerdb_unchanged_toast_columns,_peerdb_timestamp,
71+
SELECT _peerdb_data,_peerdb_record_type,_peerdb_unchanged_toast_columns, _peerdb_timestamp,
5972
RANK() OVER (PARTITION BY %s ORDER BY _peerdb_timestamp DESC) AS _peerdb_rank
6073
FROM %s.%s WHERE _peerdb_batch_id = $1 AND _peerdb_destination_table_name=$2
6174
)
6275
MERGE INTO %s dst
63-
USING (SELECT %s,_peerdb_record_type,_peerdb_unchanged_toast_columns FROM src_rank WHERE _peerdb_rank=1 ORDER BY _peerdb_timestamp) src
76+
USING (SELECT %s,_peerdb_record_type,_peerdb_unchanged_toast_columns
77+
FROM src_rank WHERE _peerdb_rank=1 ORDER BY _peerdb_timestamp) src
6478
ON %s
6579
WHEN NOT MATCHED AND src._peerdb_record_type!=2 THEN
6680
INSERT (%s) VALUES (%s) %s

flow/connectors/postgres/normalize_stmt_generator.go

Lines changed: 73 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -153,26 +153,55 @@ func (n *normalizeStmtGenerator) generateMergeStatement(
153153
columnCount := len(normalizedTableSchema.Columns)
154154
quotedColumnNames := make([]string, columnCount)
155155

156-
flattenedCastsSQLArray := make([]string, 0, columnCount)
157156
parsedDstTable, _ := common.ParseTableIdentifier(dstTableName)
158157

159-
primaryKeyColumnCasts := make(map[string]string)
158+
primaryKeyColumnCasts := make(map[string]string, len(normalizedTableSchema.PrimaryKeyColumns))
160159
primaryKeySelectSQLArray := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
160+
161+
// For jsonb_to_record path we build:
162+
// recordDefs – column definitions for the AS clause of jsonb_to_record (in the CTE)
163+
// selectExprs – the SELECT list in the USING subquery, referencing columns from src_rank.
164+
// json/jsonb columns are wrapped with _peerdb_parse_jsonb/_peerdb_parse_json
165+
// to unwrap PeerDB's stringified representation.
166+
// For legacy path we only build selectExprs (flattened casts via ->>).
167+
selectExprs := make([]string, 0, columnCount)
168+
recordDefs := make([]string, 0, columnCount)
169+
useJsonbToRecord := normalizedTableSchema.System == protos.TypeSystem_PG
161170
for i, column := range normalizedTableSchema.Columns {
162-
genericColumnType := column.Type
163171
quotedCol := common.QuoteIdentifier(column.Name)
164172
stringCol := utils.QuoteLiteral(column.Name)
165173
quotedColumnNames[i] = quotedCol
166174
pgType := n.columnTypeToPg(normalizedTableSchema, column)
167-
expr := n.generateExpr(normalizedTableSchema, genericColumnType, stringCol, pgType)
168175

169-
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("%s AS %s", expr, quotedCol))
176+
if useJsonbToRecord {
177+
// json/jsonb columns are stored as stringified text inside _peerdb_data,
178+
// so jsonb_to_record extracts them as a jsonb string wrapper.
179+
// Include them in the record as jsonb, then unwrap in the USING SELECT.
180+
switch column.Type {
181+
case "json":
182+
recordDefs = append(recordDefs, quotedCol+" jsonb")
183+
selectExprs = append(selectExprs, fmt.Sprintf("(%s #>> '{}')::json AS %s", quotedCol, quotedCol))
184+
case "jsonb":
185+
recordDefs = append(recordDefs, quotedCol+" jsonb")
186+
selectExprs = append(selectExprs, fmt.Sprintf("(%s #>> '{}')::jsonb AS %s", quotedCol, quotedCol))
187+
default:
188+
recordDefs = append(recordDefs, fmt.Sprintf("%s %s", quotedCol, pgType))
189+
selectExprs = append(selectExprs, quotedCol)
190+
}
191+
} else {
192+
genericColumnType := column.Type
193+
expr := n.generateExpr(normalizedTableSchema, genericColumnType, stringCol, pgType)
194+
selectExprs = append(selectExprs, fmt.Sprintf("%s AS %s", expr, quotedCol))
195+
}
196+
170197
if slices.Contains(normalizedTableSchema.PrimaryKeyColumns, column.Name) {
171-
primaryKeyColumnCasts[column.Name] = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
198+
if !useJsonbToRecord {
199+
primaryKeyColumnCasts[column.Name] = fmt.Sprintf("(_peerdb_data->>%s)::%s", stringCol, pgType)
200+
}
172201
primaryKeySelectSQLArray = append(primaryKeySelectSQLArray, fmt.Sprintf("src.%s=dst.%s", quotedCol, quotedCol))
173202
}
174203
}
175-
flattenedCastsSQL := strings.Join(flattenedCastsSQLArray, ",")
204+
selectExprsSQL := strings.Join(selectExprs, ",")
176205
insertValuesSQLArray := make([]string, 0, columnCount+2)
177206
for _, quotedCol := range quotedColumnNames {
178207
insertValuesSQLArray = append(insertValuesSQLArray, "src."+quotedCol)
@@ -207,19 +236,43 @@ func (n *normalizeStmtGenerator) generateMergeStatement(
207236
}
208237
}
209238

210-
mergeStmt := fmt.Sprintf(
211-
mergeStatementSQL,
212-
strings.Join(slices.Collect(maps.Values(primaryKeyColumnCasts)), ","),
213-
n.metadataSchema,
214-
n.rawTableName,
215-
parsedDstTable.String(),
216-
flattenedCastsSQL,
217-
strings.Join(primaryKeySelectSQLArray, " AND "),
218-
insertColumnsSQL,
219-
insertValuesSQL,
220-
updateStringToastCols,
221-
conflictPart,
222-
)
239+
var mergeStmt string
240+
if useJsonbToRecord {
241+
// PARTITION BY uses quoted PK column names directly — jsonb_to_record
242+
// already extracted them in the CTE via r.*
243+
primaryKeyQuotedNames := make([]string, 0, len(normalizedTableSchema.PrimaryKeyColumns))
244+
for _, pkCol := range normalizedTableSchema.PrimaryKeyColumns {
245+
primaryKeyQuotedNames = append(primaryKeyQuotedNames, common.QuoteIdentifier(pkCol))
246+
}
247+
mergeStmt = fmt.Sprintf(
248+
mergeStatementSQLJsonbToRecord,
249+
strings.Join(primaryKeyQuotedNames, ","),
250+
n.metadataSchema,
251+
n.rawTableName,
252+
strings.Join(recordDefs, ","),
253+
parsedDstTable.String(),
254+
selectExprsSQL,
255+
strings.Join(primaryKeySelectSQLArray, " AND "),
256+
insertColumnsSQL,
257+
insertValuesSQL,
258+
updateStringToastCols,
259+
conflictPart,
260+
)
261+
} else {
262+
mergeStmt = fmt.Sprintf(
263+
mergeStatementSQL,
264+
strings.Join(slices.Collect(maps.Values(primaryKeyColumnCasts)), ","),
265+
n.metadataSchema,
266+
n.rawTableName,
267+
parsedDstTable.String(),
268+
selectExprsSQL,
269+
strings.Join(primaryKeySelectSQLArray, " AND "),
270+
insertColumnsSQL,
271+
insertValuesSQL,
272+
updateStringToastCols,
273+
conflictPart,
274+
)
275+
}
223276

224277
return mergeStmt
225278
}

flow/connectors/postgres/normalize_stmt_generator_test.go

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"reflect"
55
"testing"
66

7+
"github.com/stretchr/testify/require"
8+
79
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
810
"github.com/PeerDB-io/peerdb/flow/generated/protos"
911
)
@@ -138,3 +140,213 @@ func TestGenerateMergeUpdateStatement_WithUnchangedToastColsAndSoftDelete(t *tes
138140
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
139141
}
140142
}
143+
144+
// helper to build a simple PG-typed TableSchema for merge tests
145+
func buildTableSchema(columns []*protos.FieldDescription, pks []string) *protos.TableSchema {
146+
return &protos.TableSchema{
147+
TableIdentifier: "test_table",
148+
System: protos.TypeSystem_PG,
149+
PrimaryKeyColumns: pks,
150+
Columns: columns,
151+
}
152+
}
153+
154+
func normalizeSQL(s string) string {
155+
return utils.RemoveSpacesTabsNewlines(s)
156+
}
157+
158+
func TestGenerateMergeStatement_BasicColumns(t *testing.T) {
159+
schema := buildTableSchema([]*protos.FieldDescription{
160+
{Name: "id", Type: "integer"},
161+
{Name: "name", Type: "text"},
162+
{Name: "value", Type: "numeric"},
163+
}, []string{"id"})
164+
165+
gen := normalizeStmtGenerator{
166+
rawTableName: "_peerdb_raw_test",
167+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
168+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {""}},
169+
peerdbCols: &protos.PeerDBColumns{
170+
SyncedAtColName: "_peerdb_synced_at",
171+
SoftDeleteColName: "",
172+
},
173+
metadataSchema: "_peerdb_internal",
174+
supportsMerge: true,
175+
}
176+
177+
result := gen.generateMergeStatement("public.test_table", schema, []string{""})
178+
179+
// CTE uses jsonb_to_record with record definitions
180+
require.Contains(t, result, "jsonb_to_record(_peerdb_data)")
181+
require.Contains(t, result, `"id" integer`)
182+
require.Contains(t, result, `"name" text`)
183+
require.Contains(t, result, `"value" numeric`)
184+
// USING select just references the column names directly (no ->> casts)
185+
require.Contains(t, normalizeSQL(result), normalizeSQL(`"id","name","value",_peerdb_record_type,_peerdb_unchanged_toast_columns`))
186+
// MERGE ON condition
187+
require.Contains(t, result, `src."id"=dst."id"`)
188+
require.NotContains(t, result, "_peerdb_data->>")
189+
}
190+
191+
func TestGenerateMergeStatement_JsonColumns(t *testing.T) {
192+
schema := buildTableSchema([]*protos.FieldDescription{
193+
{Name: "id", Type: "integer"},
194+
{Name: "metadata", Type: "json"},
195+
{Name: "config", Type: "jsonb"},
196+
}, []string{"id"})
197+
198+
gen := normalizeStmtGenerator{
199+
rawTableName: "_peerdb_raw_test",
200+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
201+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {""}},
202+
peerdbCols: &protos.PeerDBColumns{
203+
SyncedAtColName: "_peerdb_synced_at",
204+
SoftDeleteColName: "",
205+
},
206+
metadataSchema: "_peerdb_internal",
207+
supportsMerge: true,
208+
}
209+
210+
result := gen.generateMergeStatement("public.test_table", schema, []string{""})
211+
212+
// json/jsonb columns: record defs should declare them as jsonb
213+
require.Contains(t, result, `"metadata" jsonb`)
214+
require.Contains(t, result, `"config" jsonb`)
215+
// USING select should unwrap them via #>>
216+
require.Contains(t, normalizeSQL(result), normalizeSQL(`("metadata" #>> '{}')::json AS "metadata"`))
217+
require.Contains(t, normalizeSQL(result), normalizeSQL(`("config" #>> '{}')::jsonb AS "config"`))
218+
}
219+
220+
func TestGenerateMergeStatement_SoftDelete(t *testing.T) {
221+
schema := buildTableSchema([]*protos.FieldDescription{
222+
{Name: "id", Type: "integer"},
223+
{Name: "data", Type: "text"},
224+
}, []string{"id"})
225+
226+
gen := normalizeStmtGenerator{
227+
rawTableName: "_peerdb_raw_test",
228+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
229+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {""}},
230+
peerdbCols: &protos.PeerDBColumns{
231+
SyncedAtColName: "_peerdb_synced_at",
232+
SoftDeleteColName: "_peerdb_soft_delete",
233+
},
234+
metadataSchema: "_peerdb_internal",
235+
supportsMerge: true,
236+
}
237+
238+
result := gen.generateMergeStatement("public.test_table", schema, []string{""})
239+
240+
// soft delete: WHEN NOT MATCHED with record_type=2 inserts with soft delete TRUE
241+
require.Contains(t, normalizeSQL(result),
242+
normalizeSQL(`WHEN NOT MATCHED AND (src._peerdb_record_type=2) THEN INSERT`))
243+
require.Contains(t, normalizeSQL(result), normalizeSQL(`"_peerdb_soft_delete"`))
244+
// conflict part should be UPDATE SET soft_delete=TRUE
245+
require.Contains(t, normalizeSQL(result),
246+
normalizeSQL(`UPDATE SET "_peerdb_soft_delete"=TRUE,"_peerdb_synced_at"=CURRENT_TIMESTAMP`))
247+
}
248+
249+
func TestGenerateMergeStatement_CompositePK(t *testing.T) {
250+
schema := buildTableSchema([]*protos.FieldDescription{
251+
{Name: "tenant_id", Type: "integer"},
252+
{Name: "user_id", Type: "integer"},
253+
{Name: "email", Type: "text"},
254+
}, []string{"tenant_id", "user_id"})
255+
256+
gen := normalizeStmtGenerator{
257+
rawTableName: "_peerdb_raw_test",
258+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
259+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {""}},
260+
peerdbCols: &protos.PeerDBColumns{
261+
SyncedAtColName: "_peerdb_synced_at",
262+
SoftDeleteColName: "",
263+
},
264+
metadataSchema: "_peerdb_internal",
265+
supportsMerge: true,
266+
}
267+
268+
result := gen.generateMergeStatement("public.test_table", schema, []string{""})
269+
270+
// composite PK: PARTITION BY should use both PK columns
271+
require.Contains(t, normalizeSQL(result), normalizeSQL(`PARTITION BY "tenant_id","user_id"`))
272+
// ON clause should join on both
273+
require.Contains(t, normalizeSQL(result), normalizeSQL(`src."tenant_id"=dst."tenant_id"`))
274+
require.Contains(t, normalizeSQL(result), normalizeSQL(`src."user_id"=dst."user_id"`))
275+
}
276+
277+
func TestGenerateMergeStatement_ToastColumns(t *testing.T) {
278+
schema := buildTableSchema([]*protos.FieldDescription{
279+
{Name: "id", Type: "integer"},
280+
{Name: "small_col", Type: "text"},
281+
{Name: "big_col", Type: "text"},
282+
}, []string{"id"})
283+
284+
gen := normalizeStmtGenerator{
285+
rawTableName: "_peerdb_raw_test",
286+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
287+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {"", "big_col"}},
288+
peerdbCols: &protos.PeerDBColumns{
289+
SyncedAtColName: "_peerdb_synced_at",
290+
SoftDeleteColName: "",
291+
},
292+
metadataSchema: "_peerdb_internal",
293+
supportsMerge: true,
294+
}
295+
296+
result := gen.generateMergeStatement("public.test_table", schema, []string{"", "big_col"})
297+
298+
normalized := normalizeSQL(result)
299+
// Should have an update branch for unchanged toast col = '' (all cols updated)
300+
require.Contains(t, normalized, normalizeSQL(`_peerdb_unchanged_toast_columns=''`))
301+
// Should have an update branch for unchanged toast col = 'big_col' (only id and small_col updated)
302+
require.Contains(t, normalized, normalizeSQL(`_peerdb_unchanged_toast_columns='big_col'`))
303+
require.Contains(t, normalized, normalizeSQL(`"id"=src."id","small_col"=src."small_col"`))
304+
}
305+
306+
func TestGenerateMergeStatement_UserDefinedType(t *testing.T) {
307+
schema := buildTableSchema([]*protos.FieldDescription{
308+
{Name: "id", Type: "integer"},
309+
{Name: "status", Type: "my_enum", TypeSchemaName: "my_schema"},
310+
}, []string{"id"})
311+
312+
gen := normalizeStmtGenerator{
313+
rawTableName: "_peerdb_raw_test",
314+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
315+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {""}},
316+
peerdbCols: &protos.PeerDBColumns{
317+
SyncedAtColName: "_peerdb_synced_at",
318+
SoftDeleteColName: "",
319+
},
320+
metadataSchema: "_peerdb_internal",
321+
supportsMerge: true,
322+
}
323+
324+
result := gen.generateMergeStatement("public.test_table", schema, []string{""})
325+
326+
// User-defined types should be schema-qualified in the record definition
327+
require.Contains(t, result, `"status" "my_schema"."my_enum"`)
328+
}
329+
330+
func TestGenerateNormalizeStatements_Merge(t *testing.T) {
331+
schema := buildTableSchema([]*protos.FieldDescription{
332+
{Name: "id", Type: "integer"},
333+
{Name: "name", Type: "text"},
334+
}, []string{"id"})
335+
336+
gen := normalizeStmtGenerator{
337+
rawTableName: "_peerdb_raw_test",
338+
tableSchemaMapping: map[string]*protos.TableSchema{"public.test_table": schema},
339+
unchangedToastColumnsMap: map[string][]string{"public.test_table": {""}},
340+
peerdbCols: &protos.PeerDBColumns{
341+
SyncedAtColName: "_peerdb_synced_at",
342+
SoftDeleteColName: "",
343+
},
344+
metadataSchema: "_peerdb_internal",
345+
supportsMerge: true,
346+
}
347+
348+
stmts := gen.generateNormalizeStatements("public.test_table")
349+
require.Len(t, stmts, 1)
350+
require.Contains(t, stmts[0], "jsonb_to_record")
351+
require.Contains(t, stmts[0], "MERGE INTO")
352+
}

0 commit comments

Comments
 (0)