A production-grade data platform built on Databricks + Delta Lake with AWS MWAA (Airflow) orchestration and dbt transformations. This repository demonstrates the architecture, patterns, and technologies used to build a scalable medallion-architecture data pipeline.
┌───────────────────────────────────────────────────────────────────────────┐
│ ORCHESTRATION (AWS MWAA) │
│ ┌──────────────┐ ┌───────────────┐ ┌────────────┐ ┌───────────────┐ │
│ │ DAG Factory │ │ Autoloader │ │ Kafka │ │ Geo Pipeline │ │
│ │ (YAML-driven)│ │ Factories │ │ Landing │ │ Workflows │ │
│ └──────┬───────┘ └──────┬────────┘ └─────┬──────┘ └──────┬────────┘ │
└─────────┼─────────────────┼─────────────────┼────────────────┼────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌────────────────────────────────────────────────────────────────────────────┐
│ DATABRICKS COMPUTE LAYER │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ BRONZE (Raw Ingestion) │ │
│ │ • Cloud Files Autoloader (CSV/JSON/Parquet from S3) │ │
│ │ • Kafka Structured Streaming (Protobuf + JSON deserialization) │ │
│ │ • SFTP → S3 → Autoloader pipeline │ │
│ │ • Google Sheets → S3 → Autoloader pipeline │ │
│ └─────────────────────────┬───────────────────────────────────────────┘ │
│ │ Delta CDF (Change Data Feed) │
│ ┌─────────────────────────▼───────────────────────────────────────────┐ │
│ │ SILVER (Cleaned + Enriched) │ │
│ │ • Watermark-based streaming deduplication │ │
│ │ • dbt models: dimensions, facts, staging views, feature stores │ │
│ │ • Incremental processing with configurable lag windows │ │
│ │ • Window functions, SCD patterns, surrogate keys │ │
│ └─────────────────────────┬───────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────▼───────────────────────────────────────────┐ │
│ │ GOLD (Aggregated + Reporting) │ │
│ │ • Aggregated views and reporting tables │ │
│ │ • Cohort analysis, retention metrics │ │
│ │ • Data sharing views (multi-tenant filtered) │ │
│ │ • Environment-aware access control (DEV/PROD filtering) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────┐ ┌──────────────────────────────────────┐ │
│ │ STREAMING PIPELINES │ │ GEO PROCESSING │ │
│ │ • Delta CDF micro-batch │ │ • Spatial joins (GeoPandas) │ │
│ │ • Kafka → Protobuf parse │ │ • External API ingestion │ │
│ │ • foreachBatch transforms│ │ • Nearest-neighbor matching │ │
│ │ • CloudWatch observability│ │ • Admin boundary enrichment │ │
│ └──────────────────────────┘ └──────────────────────────────────────┘ │
└────────────────────────────────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────────────────────────────┐
│ INFRASTRUCTURE │
│ ┌──────────────┐ ┌───────────────┐ ┌────────────┐ ┌───────────────┐ │
│ │ Flyway │ │ GitLab CI/CD │ │ Docker │ │ Brand Model │ │
│ │ Migrations │ │ Multi-stage │ │ Local Dev │ │ Generator │ │
│ └──────────────┘ └───────────────┘ └────────────┘ └───────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
┌──────────┐
│ SFTP │──── SFTPToS3Operator (async) ─────┐
└──────────┘ │
┌──────────┐ ▼
│ Kafka │──── Structured Streaming ───► ┌──────────┐
└──────────┘ (Protobuf/JSON) │ BRONZE │
┌──────────┐ │ (Delta) │
│ Google │──── PySpark Ingest ──────────►│ │
│ Sheets │ │ CDF ON │
└──────────┘ └────┬─────┘
┌──────────┐ │
│ CSV Seed │──── Cloud Files Autoloader ───────►│
│ Files │ │
└──────────┘ │
┌───────────────┘
│ Delta Change Data Feed
▼
┌──────────┐
Watermark Dedup ─────────►│ SILVER │◄──── dbt Models
Streaming Merge │ (Delta) │ (dims, facts,
└────┬─────┘ staging views)
│
│ dbt Transformations
▼
┌──────────┐
│ GOLD │──── Reporting Views
│ (Delta) │ Data Sharing
└──────────┘ Cohort Analysis
| Layer | Technology | Purpose |
|---|---|---|
| Orchestration | AWS MWAA (Airflow 2.10) | DAG scheduling, task dependencies |
| Compute | Databricks (Spark) | Distributed processing, streaming |
| Storage | Delta Lake on S3 | ACID tables, time travel, CDF |
| Transforms | dbt (via Cosmos) | SQL-based medallion transforms |
| Schema Mgmt | Flyway | Version-controlled DDL migrations |
| CI/CD | GitLab CI | Automated deployments |
| Monitoring | CloudWatch | Streaming metrics, alerting |
| Geo Processing | GeoPandas | Spatial joins, boundary matching |
| Alerting | Slack | DAG failure notifications |
aws-databricks-project/
│
├── airflow/ # Orchestration layer
│ ├── dags/
│ │ ├── main_dag_factory.py # YAML-driven DAG generation (19+ DAGs)
│ │ ├── configs/
│ │ │ └── dbt_dag_configs.yaml # Schedule + model tag definitions
│ │ ├── common/operators/ # Custom Airflow operators
│ │ │ ├── sftp_to_s3_operator.py # Async SFTP→S3 transfer
│ │ │ └── databricks_api_operators.py # SQL Warehouse lifecycle
│ │ ├── helpers/
│ │ │ ├── dbt_runner.py # Cosmos DbtTaskGroup wrapper
│ │ │ ├── cancellable_databricks.py # Graceful job cancellation
│ │ │ └── alerts.py # Slack failure callback
│ │ ├── ingress/ # Data ingestion pipelines
│ │ │ ├── autoloader/ # Multi-source autoloaders
│ │ │ │ ├── config_loader.py # Environment-aware config
│ │ │ │ ├── sftp/ # SFTP file ingestion
│ │ │ │ ├── google_sheets/ # Google Sheets ingestion
│ │ │ │ └── seeds/ # CSV seed file loading
│ │ │ └── kafka_landing/ # Kafka streaming ingestion
│ │ ├── geographic_pipelines/ # Geo data processing
│ │ ├── ml_pipelines/ # ML feature pipelines
│ │ └── dbt/sample_dbt_project/ # dbt models (medallion arch)
│ │ ├── models/silver/ # Dimensions, facts, staging
│ │ ├── models/gold/ # Reporting, aggregations
│ │ └── macros/ # Reusable SQL macros
│ ├── mwaa/ # MWAA config-as-code
│ │ ├── sync_mwaa_config.py # Pools + variables sync
│ │ ├── pools/ # Pool definitions (YAML)
│ │ └── variables/ # Variable definitions (YAML)
│ ├── bootstrap/startup.sh # MWAA worker bootstrap
│ └── tests/ # Operator unit tests
│
├── databricks/ # PySpark jobs
│ ├── observability.py # StreamingQueryListener + CloudWatch
│ ├── bronze/
│ │ ├── autoloaders/ # Cloud Files autoloader notebooks
│ │ ├── kafka_landing/ # Multi-topic Kafka consumer
│ │ └── deduplication/ # Watermark-based dedup pipeline
│ ├── streaming_pipeline/ # Delta CDF streaming transforms
│ │ ├── pipeline.py # Base pipeline (CDF + Kafka)
│ │ ├── notebooks/ # Runner entry points
│ │ └── scripts/ # Transform functions
│ └── geo_pipelines/ # Spatial data processing
│
├── flyway_migrations/ # Schema version control
│ ├── run_schema_migration.sh # Migration runner
│ └── migrations/ # SQL DDL scripts
│
├── ci_cd/ # GitLab CI/CD
│ ├── pipeline.yml # Multi-stage pipeline
│ ├── templates/ # Reusable job templates
│ └── scripts/ # Helper scripts (OAuth tokens)
│
└── brand_generation/ # Multi-tenant model generation
└── generate_brand_models.py # Dynamic dbt + DAG generator
airflow/dags/main_dag_factory.py — A single YAML config generates 19+ DAGs with different schedules, model selections, and warehouse connections. Eliminates DAG duplication for recurring dbt runs.
airflow/dags/ingress/ — Each data source type (SFTP, Kafka, Google Sheets, CSV seeds) has its own factory that reads environment-specific YAML and generates per-source DAGs with appropriate operators.
airflow/dags/common/operators/ — Custom async SFTP operator, Databricks SQL Warehouse lifecycle manager, and a cancellable Databricks operator that prevents orphaned jobs on task failure.
databricks/streaming_pipeline/ — Uses Delta CDF to read incremental changes from bronze tables, apply micro-batch transforms via foreachBatch(), and write to downstream tables with checkpointing.
databricks/bronze/deduplication/ — Stateful streaming deduplication using Spark watermarks and dropDuplicatesWithinWatermark() to bound state size while ensuring exactly-once processing.
databricks/bronze/kafka_landing/ — Multi-topic Kafka consumer supporting both Protobuf (with .desc descriptor files) and JSON message formats, with metadata enrichment and schema alignment.
airflow/dags/dbt/sample_dbt_project/ — Silver dimensions/facts with incremental models, window functions, surrogate keys; Gold reporting tables with cohort analysis and environment-aware data sharing views.
flyway_migrations/ — Version-controlled Delta table DDL with advanced features (CDF, deletion vectors, external S3 locations). Parallel layer execution with automatic repair-on-failure.
databricks/geo_pipelines/ — Downloads external geolocation data via API, performs spatial joins using GeoPandas (exact match + nearest-neighbor fallback), and enriches records with administrative boundary data.
airflow/mwaa/ — Airflow pools and variables defined in YAML, synced to MWAA via a Python script with deep-merge across common + environment-specific configs, dry-run support, and idempotent updates.
brand_generation/ — Dynamically generates per-brand dbt projects and Airflow DAGs from templates. Queries Databricks for active brands and creates isolated schemas (silver_{brand_id}, gold_{brand_id}).
databricks/observability.py — Custom StreamingQueryListener publishes real-time metrics (rows/sec, batch duration, lag) to CloudWatch. Multi-query manager with exponential backoff retry.
ci_cd/ — GitLab CI pipeline syncs DAGs to S3, updates Databricks repos, and manages MWAA deployments. Short-lived Databricks PAT tokens generated via OAuth service principal flow.
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ DEV │ │ STG │ │ PROD │
│ │ │ │ │ │
│ dev branch │ │ dev branch │ │ master branch│
│ auto-deploy │ │ auto-deploy │ │ auto-deploy │
│ │ │ │ │ │
│ S3: ...-dev │ │ S3: ...-stg │ │ S3: ...-prod │
│ MWAA: ...-dev│ │ MWAA: ...-stg│ │ MWAA: ...-prd│
│ Catalog: dev │ │ Catalog: stg │ │ Catalog: prod│
└──────────────┘ └──────────────┘ └──────────────┘
cd airflow
cp .env.example .env # Configure credentials
docker-compose up -d # Airflow at localhost:8080
# dbt operations
cd dags/dbt/sample_dbt_project
export DBT_HOST='your-host' DBT_HTTP_PATH='/sql/...' ...
dbt deps && dbt run --select dim_entity