-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconvert_v3_to_postgres_and_bronze.py
More file actions
92 lines (71 loc) · 2.92 KB
/
convert_v3_to_postgres_and_bronze.py
File metadata and controls
92 lines (71 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python3
"""
Convert V3 DuckDB schemas to PostgreSQL and Bronze formats.
Extracts CREATE TABLE from V3 tables.go and generates compatible SQL.
"""
import re
import sys
def convert_to_postgres(duckdb_sql, table_name):
"""Convert DuckDB CREATE TABLE to PostgreSQL UNLOGGED format."""
pg_sql = duckdb_sql
# Remove placeholders
pg_sql = re.sub(r'%s\.%s\.', '', pg_sql)
# Convert types
pg_sql = pg_sql.replace('VARCHAR', 'TEXT')
pg_sql = pg_sql.replace('TIMESTAMP', 'TIMESTAMPTZ')
pg_sql = pg_sql.replace('UINTEGER', 'BIGINT')
pg_sql = pg_sql.replace('CREATE TABLE IF NOT EXISTS', 'CREATE UNLOGGED TABLE IF NOT EXISTS')
return pg_sql
def convert_to_bronze(duckdb_sql, table_name):
"""Convert to Bronze (DuckLake) format."""
bronze_sql = duckdb_sql
# Replace schema prefix with bronze.
bronze_sql = re.sub(r'%s\.%s\.', 'bronze.', bronze_sql)
return bronze_sql
def extract_v3_schemas(tables_go_path):
"""Extract all CREATE TABLE statements from V3 tables.go."""
with open(tables_go_path, 'r') as f:
content = f.read()
# Find all CREATE TABLE blocks
# Pattern: CREATE TABLE ... )`
pattern = r'(CREATE TABLE IF NOT EXISTS.*?\))`'
matches = re.findall(pattern, content, re.DOTALL)
schemas = {}
for match in matches:
# Extract table name
name_match = re.search(r'\.(\w+) \(', match)
if name_match:
table_name = name_match.group(1)
# Skip metadata tables for now
if not table_name.startswith('_meta'):
schemas[table_name] = match
return schemas
# Main execution
v3_schemas = extract_v3_schemas('ducklake-ingestion-obsrvr-v3/go/tables.go')
print(f"Extracted {len(v3_schemas)} table schemas from V3")
print("\nData tables found:")
for i, table_name in enumerate(sorted(v3_schemas.keys()), 1):
print(f" {i}. {table_name}")
# Generate PostgreSQL schema
print("\n" + "="*60)
print("Generating PostgreSQL schema...")
print("="*60)
pg_output = "-- PostgreSQL schema from V3 (UNLOGGED tables for hot buffer)\n\n"
for table_name in sorted(v3_schemas.keys()):
pg_sql = convert_to_postgres(v3_schemas[table_name], table_name)
pg_output += f"-- {table_name}\n"
pg_output += pg_sql + ";\n\n"
with open('v3_postgres_schema.sql', 'w') as f:
f.write(pg_output)
print(f"✅ Written to v3_postgres_schema.sql ({len(pg_output)} bytes)")
# Generate Bronze schema
print("\nGenerating Bronze (DuckLake) schema...")
bronze_output = "-- Bronze schema from V3 (for DuckLake catalog)\n\n"
for table_name in sorted(v3_schemas.keys()):
bronze_sql = convert_to_bronze(v3_schemas[table_name], table_name)
bronze_output += f"-- {table_name}\n"
bronze_output += bronze_sql + ";\n\n"
with open('v3_bronze_schema.sql', 'w') as f:
f.write(bronze_output)
print(f"✅ Written to v3_bronze_schema.sql ({len(bronze_output)} bytes)")
print("\nConversion complete!")