A comprehensive data engineering platform demonstrating production-grade ETL pipelines for ingesting, transforming, validating, and delivering analytical data from 20+ heterogeneous sources into a Snowflake data warehouse using Data Vault 2.0 methodology.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DATA SOURCE LAYER β
β β
β ββββββββββββ ββββββββ-ββββ ββββββββ-ββββ ββββββββββ-ββ ββββββββββββ β
β β Excel β β CSV β β SFTP β β API β β Parquet β β
β β Reports β β Exports β β Vendor β β Platforms β β Files β β
β ββββββ¬ββββββ βββ-βββ¬ββββββ βββ-βββ¬ββββββ βββ-βββ¬ββββββ βββ-βββ¬βββββ β
β β β β β β β
βββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββΌβββββββββββββββΌββββββββββ
β β β β β
βΌ βΌ βΌ βΌ βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β AWS S3 DATA LAKE β
β β
β s3://data-lake/ s3://pipeline/ β
β βββ excel_reports/ βββ stage1/ (extracted) β
β βββ csv_exports/ βββ stage2/ (mapped) β
β βββ vendor_data/ βββ stage3/ (distributed) β
β βββ api_dumps/ βββ exports/ (delivered) β
β β
βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β APACHE AIRFLOW ORCHESTRATION β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EXTRACT PHASE β β
β β β β
β β ββββββββββββββββ βββββββββββββββββ ββββββββββββββββββββββββββ β β
β β β Excel β β CSV Platform β β FTP Scanner β β β
β β β Extractor β β Extractor β β β FTP-to-S3 Transfer β β β
β β β (skiprows, β β (S3 Select, β β (buffered streaming, β β β
β β β multi-sheet,β β chunked read,β β incremental detect, β β β
β β β col rename) β β server-side β β metadata tracking) β β β
β β β β β SQL filter) β β β β β
β β ββββββββ¬ββββββββ ββββββββ¬βββββββββ βββββββββββββ¬βββββββββββββ β β
β β β β β β β
β βββββββββββΌββββββββββββββββββΌβββββββββββββββββββββββββΌβββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β VALIDATE PHASE (Great Expectations) β β
β β β β
β β Source Data Validation: β β
β β β’ Column schema checks β’ Value range validation β β
β β β’ Null checks β’ Row count bounds β β
β β β’ Date format validation β’ Type checking β β
β β β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β TRANSFORM PHASE β β
β β β β
β β ββββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ β β
β β β Category Mapping β β NationalβRegionalβ β Region Code β β β
β β β (raw names β β β Distribution β β Conversion β β β
β β β standard codes, β β (weighted geo β β (DMAβRMA cross- β β β
β β β unmapped report,β β extrapolation, β β reference map) β β β
β β β campaign remap) β β population β β β β β
β β β β β shares) β β β β β
β β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ ββββββ-βββ¬βββββββββ β β
β β β β β β β
β βββββββββββββΌββββββββββββββββββββββΌββββββββββββββββββββββΌββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LOAD PHASE β β
β β β β
β β ββββββββββββββββββββββββββ ββββββββββββββββββββββββββ β β
β β β Classic Pipeline β β Data Vault Pipeline β β β
β β β (S3 β Snowflake) β β (S3 β Hub/Link/Sat) β β β
β β β β β β β β
β β β 1. Create Schema β β 1. Load YAML config β β β
β β β 2. Create Stage β β 2. Generate Hub INSERTsβ β β
β β β 3. Create File Format β β 3.Generate Link INSERTsβ β β
β β β 4. COPY INTO table β β 4. Generate Sat INSERTsβ β β
β β β 5. Auto-detect columns β β 5. Hash key generation β β β
β β ββββββββββββββββββββββββββ ββββββββββββββββββββββββββ β β
β β β β
β βββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β VALIDATE PHASE (Warehouse) β β
β β β β
β β Snowflake Data Validation: β β
β β β’ Post-load row counts β’ Referential integrity β β
β β β’ Value distributions β’ Schema consistency β β
β β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β SNOWFLAKE DATA WAREHOUSE β
β β
β βββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββββ β
β β DATA_VAULT DB β β PIPELINE DB β β REPORTING DB β β
β β β β β β β β
β β β’ h_category β β β’ source tables β β β’ _UNIFIED_METRICS β β
β β β’ h_platform β β β’ staging tables β β β’ PIVOT_CATEGORY β β
β β β’ h_creative β β β β β’ PIVOT_PORTFOLIO β β
β β β’ h_region β β β β β’ PIVOT_SECTOR β β
β β β’ nhl_* (links) β β β β β β
β β β’ lsat_* (sats) β β β β Exports β S3 β β
β β β’ hsat_* (sats) β β β β β Dashboards β β
β βββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Pipeline Failure
β
ββββ Slack Notification (channel routing by DAG owner)
β β’ DAG ID, Task ID, Execution Date
β β’ Error message, Airflow UI link
β
ββββ Sentry Error Tracking
β’ Exception capture with tags
β’ DAG/task/data_source context
β’ Validation failure alerts
pandas-cloud-project/
β
βββ dags/ # Airflow DAG definitions
β βββ constants.py # Central config: connections, buckets, DBs
β βββ sample_excel_pipeline.py # Excel β S3 β Snowflake (full chain)
β βββ sample_csv_pipeline.py # CSV with S3 Select + parallel branches
β βββ sample_data_vault_pipeline.py # Data Vault 2.0 loading (Hub/Link/Sat)
β βββ sample_ftp_ingestion.py # SFTP monitor β S3 β Parquet β Snowflake
β βββ delivery/
β β βββ sample_pivot_tables.py # Aggregation + pivot tables + export
β βββ operators/
β βββ data_vault_operator.py # YAML-driven Data Vault loader
β
βββ pipeline/ # Core processing modules
β βββ extractors/
β β βββ base_extractor.py # Abstract extractor interface
β β βββ excel_extractor.py # Multi-sheet Excel parsing
β β βββ csv_extractor.py # Chunked CSV + S3 Select
β β βββ multi_sheet_extractor.py # Regex-classified multi-channel Excel
β βββ operators/
β β βββ extract_operator.py # S3 source β extract β S3 dest
β β βββ category_map_operator.py # Raw names β standard codes
β β βββ geo_distribution_operator.py # National β regional weighted split
β β βββ worker_operator.py # Parallel work partitioning base
β βββ delivery/
β β βββ create_schema.py # Snowflake CREATE SCHEMA
β β βββ create_stage.py # Snowflake external stage (S3)
β β βββ create_file_format.py # CSV/Parquet format definitions
β β βββ create_table_from_s3.py # Auto-detect + COPY INTO
β β βββ create_strategy.py # OR REPLACE / IF NOT EXISTS enum
β β βββ utils.py # SQL template reader, schema naming
β β βββ sql/ # SQL templates
β β βββ create_schema.sql
β β βββ create_stage.sql
β β βββ create_file_format.sql
β β βββ create_table_from_s3.sql
β β βββ aggregate_metrics.sql # Multi-source UNION + LEFT JOIN
β β βββ create_pivot_table.sql # Time-granularity aggregation
β βββ connectors/
β β βββ ftp_to_s3.py # Buffered SFTP β S3 transfer
β β βββ ftp_scanner.py # FTP directory monitoring sensor
β β βββ snowflake_bulk_hook.py # Batched INSERT operations
β βββ validation/
β β βββ source_validation_operator.py # Pre-load GE validation (S3)
β β βββ warehouse_validation_operator.py # Post-load GE validation (SF)
β βββ notifications/
β β βββ slack_notifications.py # Formatted Slack alerts
β β βββ error_tracking.py # Sentry exception capture
β βββ utils/
β β βββ date_utils.py # Week normalization, ISO year
β β βββ s3_utils.py # DataFrame β S3 I/O
β β βββ parallel_utils.py # Multiprocessing + Dask helpers
β βββ decorators.py # Runtime arg injection (Variable/XCom/Conf)
β
βββ data_vault_config/
β βββ sample_display_platform.yaml # Hub-Link-Satellite YAML schema
β
βββ great_expectations/
β βββ great_expectations.yml # GE config (S3 + Snowflake datasources)
β βββ expectations/
β βββ sample_source_suite.json # Column/value/type expectations
β
βββ alembic/
β βββ versions/
β βββ 001_initial_data_vault_schema.py # Hub/Link/Sat DDL + seed data
β
βββ docker/
β βββ Dockerfile # Production Airflow image
β βββ docker-compose.yaml # Local dev (Postgres + S3 mock + Airflow)
β
βββ tests/
β βββ test_sample_extractor.py # Unit tests for extractors + utils
β
βββ .argo-ci/
β βββ pipeline.yaml # CI/CD: test β build β deploy (Kaniko + K8s)
β
βββ README.md # This file
| Layer | Technology | Purpose |
|---|---|---|
| Orchestration | Apache Airflow 1.10 | DAG scheduling, task dependencies, UI |
| Processing | pandas, NumPy | DataFrame transformations, aggregations |
| Parallel | Dask, multiprocessing | Distributed/multi-core data processing |
| Warehouse | Snowflake | Cloud data warehouse (COPY INTO, stages) |
| Data Model | Data Vault 2.0 | Hub-Link-Satellite schema (Alembic migrations) |
| Validation | Great Expectations | Automated data quality checks |
| Storage | AWS S3 | Data lake, staging, exports |
| Ingestion | SFTP, S3 Select | Vendor file transfer, server-side filtering |
| Formats | CSV, Parquet, Excel | Multi-format extraction (openpyxl, pyarrow) |
| Notifications | Slack, Sentry | Pipeline failure alerts, error tracking |
| Containers | Docker, Docker Compose | Reproducible environments |
| CI/CD | Argo Workflows, Kaniko | GitOps deployment to Kubernetes |
| Infrastructure | Kubernetes, ECR | Container orchestration, image registry |
1. Excel Report Pipeline (dags/sample_excel_pipeline.py)
Pattern: Excel workbook β multi-stage S3 processing β Snowflake table
- Skips header rows in vendor reports
- Renames vendor-specific columns to standard schema
- Handles fallback category classification
- Filters invalid data (negative values)
- Normalizes dates to week boundaries
- Full chain: Extract β Validate β Map β Distribute β Create Schema β Load β Validate
2. CSV Platform Pipeline (dags/sample_csv_pipeline.py)
Pattern: Large CSV exports β server-side filtering β parallel branches β merge
- S3 Select: SQL-based server-side row filtering before download
- Chunked reading: Memory-efficient processing of multi-GB files (1M rows/chunk)
- Parallel branches: Impressions and spend processed independently
- Epoch-based week normalization: Integer arithmetic for date alignment
3. Data Vault Pipeline (dags/sample_data_vault_pipeline.py)
Pattern: S3 files β YAML-configured Data Vault loading β Hub/Link/Satellite tables
- YAML-driven schema mapping: No hardcoded SQL per source
- Hash key generation: MD5-based entity identification
- Multi-channel parallel loading: Display, Search, Video loaded independently
- S3 sensor: Waits for new files before processing
4. FTP Ingestion Pipeline (dags/sample_ftp_ingestion.py)
Pattern: SFTP monitoring β incremental transfer β format conversion β warehouse load
- FTP Scanner sensor: Polls SFTP, tracks processed files in S3 metadata
- Buffered transfer: 1MB chunk streaming for large files
- Format conversion: CSV/Excel β Parquet (columnar, compressed)
- Scheduled weekly: Cron-based execution
5. Delivery Pipeline (dags/delivery/sample_pivot_tables.py)
Pattern: Multi-source aggregation β pivot tables β access grants β S3 export
- UNION-based aggregation: Combines all source tables into unified view
- Multi-granularity pivots: Week, Month, Quarter roll-ups
- RBAC grants: Automatic role-based access control
- S3 export: Parquet export for external consumption
Data moves through numbered stages (stage1/, stage2/, stage3/) in S3.
Each stage is the output of one operator, enabling:
- Debugging: Inspect intermediate results at any stage
- Idempotency: Re-run any stage without re-processing upstream
- Incremental processing: Skip already-processed files
The OverrideArgs decorator pattern allows operators
to receive values from Airflow Variables, XCom, or DAG run config at execution
time rather than DAG parse time. This enables:
- Environment-specific schemas (DEV/STAGING/PROD)
- Version-based delivery schemas
- Dynamic configuration without DAG code changes
Raw data is loaded into a Data Vault schema (Hubs, Links, Satellites) before transformation. This provides:
- Auditability: Full load history with timestamps
- Agility: Add new sources without schema changes
- Parallel loading: Independent loads per source
Great Expectations validates data at two points:
- Source validation (S3): Before transformation, catch bad source files early
- Warehouse validation (Snowflake): After loading, verify data integrity
| Source Type | Extractor | Example |
|---|---|---|
| Excel workbook (single sheet) | ExcelReportExtractor |
Vendor spend reports |
| Excel workbook (multi-sheet, regex-classified) | MultiSheetExtractor |
Platform channel reports |
| CSV (large, server-side filtered) | CSVPlatformExtractor |
Ad platform exports |
| CSV (V2 simplified schema) | CSVPlatformExtractorV2 |
Updated format exports |
| SFTP file transfer | FTPToS3Operator |
Vendor data delivery |
| SFTP directory monitoring | FTPScannerSensor |
Incremental file detection |
| Parquet (columnar) | CreateTableFromS3 |
Converted/optimized data |
| Snowflake table-to-table | DataVaultOperator |
Cross-database transforms |
# 1. Start local development environment
cd docker
docker-compose up -d
# 2. Access Airflow UI
open http://localhost:8080
# 3. Run tests
pytest tests/ -v
# 4. Trigger a sample DAG
airflow dags trigger media_report PR Created PR Merged to Master
β β
βΌ βΌ
ββββββββββ ββββββββββββββ
β Lint + β β Check β
β Test β β Version β
ββββββββββ βββββββ¬βββββββ
β
βββββββΌβββββββ
β Bump β
β Version β
βββββββ¬βββββββ
β
βββββββΌβββββββ
β Kaniko β
β Docker β
β Build β
βββββββ¬βββββββ
β
βββββββΌβββββββ
β Push to β
β ECR β
βββββββ¬βββββββ
β
βββββββΌβββββββ
β Update K8s β
β Manifests β
ββββββββββββββ
This project demonstrates data engineering patterns for multi-source analytics pipelines. All names, values, and configurations are sanitized samples for portfolio demonstration.