Skip to content

upper-stack/financial-data-pipeline-using-aws-and-pyspark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

6 Commits
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Financial Data Pipeline with AWS and PySpark

Python AWS PySpark Docker

A production-grade financial data pipeline that normalizes multi-currency bank transactions to a single base currency (SGD) using real-time exchange rates. Built with AWS services and PySpark for scalable processing and analytics.

📋 Table of Contents

💼 Business Overview

In the application of remittance and currency conversion, the normalization of values plays a crucial role in various industries, especially the banking sector. Most FinTech and bank sector companies extend loans to individuals and businesses only after evaluating their creditworthiness based on rigorous analysis of past transactions and remittances.

Customers who are frequent travelers or engage in international transactions often perform banking activities such as deposits and withdrawals in different currencies like INR, USD, and SGD. This inconsistency creates challenges when assessing their financial health for services like lending.

This comprehensive data pipeline integrates raw bank transaction data with real-time API exchange rates, then maps transactions to standardized currency values by date. This solution:

  • Streamlines workflows for currency normalization
  • Empowers organizations to focus on lending decisions
  • Drives insights rapidly for business growth

Business Use Cases

Use Case Description
Credit Worthiness Analysis Evaluate customer creditworthiness based on deposit-to-withdrawal ratios
Currency Exposure Analysis Track deposits and withdrawals across 160+ currencies
Account Balance Tracking Monitor balance changes over time in standardized SGD
Transaction Analytics Perform ad-hoc analysis on banking transactions

🎯 Project Aim

The project aims to normalize raw bank transaction data into a single base currency (Singapore Dollars - SGD) using AWS services. The primary goal is to transform customers' transaction values with precise reference to real-time exchange rates fetched from the Open Exchange API at the corresponding transaction dates.

Pipeline Flow:

  1. Docker container on EC2 fetches real-time exchange rates from Open Exchange API
  2. Exchange rates uploaded to S3 alongside bank transaction data
  3. Spark transformations run on EMR cluster to calculate normalized transaction values
  4. Processed data stored in S3 as partitioned Parquet files
  5. Glue Crawler catalogs data for Athena SQL analysis

🏗 Architecture

flowchart LR
    subgraph Input["Input Layer"]
        API[🌐 API]
        subgraph Docker["🐳 Docker"]
            Python[🐍 Python]
        end
    end
    
    API -->|GET/Response| Python
    Python -->|Ingest if status 200| S3Raw[☁️ S3<br/>data/currency]
    S3Raw -->|Data Manipulation| EMR[🔥 EMR]
    
    subgraph Output["Output Layer"]
        S3Proc[☁️ S3<br/>Processed]
        Athena[📊 Athena]
    end
    
    EMR --> S3Proc
    S3Proc --> Athena
Loading

✨ Key Features

Feature Description
Data Extraction API calls in Docker container on EC2 ensure isolated, scalable, and consistent execution across environments
Central Location Raw data, API data, and processed data stored in single S3 bucket for simplified monitoring and updates
Complex Workflow Handling Cloud-based pipeline efficiently manages complex data workflows with scalability and robustness
Data Analysis Transformed data analyzed using SQL queries in Athena within the cloud workspace
Historical Backfill Bulk historical data loading capability for past exchange rates
Partitioned Storage Year/month partitioning for optimized query performance

🛠 Tech Stack

Service Description
AWS S3 Object storage for data lake - stores raw transactions, API responses, and processed data
AWS EC2 Scalable virtual servers hosting Docker containers for API data extraction
AWS EMR Managed Spark clusters for distributed big data processing and transformations
AWS Glue Serverless ETL with Data Catalog for metadata management and table discovery
AWS Athena Serverless SQL queries directly on S3 data without loading into a database
Docker Containerization ensuring consistent execution across dev, test, and production
PySpark Distributed data processing library for large-scale transformations
Python Primary programming language for API ingestion and orchestration

� Data Description

This project uses two distinct data sources:

1. Exchange Rate API (OpenExchangeRates)

Historical exchange rates with time-specific timestamps for converting transaction amounts to SGD.

2. Bank Transaction Data

Column Description
account_no Unique identifier for every bank account
date Date when the transaction occurred
transaction_details Describes the nature of the transaction
chq_no Cheque number used, if applicable
value_date Date when transaction value is applied to account
withdrawal_amt Amount withdrawn in the transaction
withdrawal_currency Currency of the withdrawal (e.g., USD, INR, EUR)
deposit_amt Amount deposited into the account
deposit_currency Currency of the deposit
balance_amt Account balance after transaction

🔄 Approach

flowchart LR
    A[1️⃣ Load Raw Data<br/>to S3] --> B[2️⃣ Create EC2<br/>Install Docker]
    B --> C[3️⃣ Run Docker<br/>Fetch Rates]
    C --> D[4️⃣ Create EMR<br/>Cluster]
    D --> E[5️⃣ CSV to<br/>Parquet]
    E --> F[6️⃣ PySpark<br/>Normalize SGD]
    F --> G[7️⃣ Store in S3]
    G --> H[8️⃣ Glue Crawler]
    H --> I[9️⃣ Athena<br/>Analytics]
Loading

Step-by-Step:

  1. Raw Data Loading: Bank transaction data loaded into S3 bucket as central location
  2. EC2 + Docker Setup: EC2 instance created with Docker installed for isolated execution
  3. API Data Extraction: Dockerfile executed within container to fetch currency exchange rates
  4. EMR Cluster Creation: Big data cluster provisioned for Spark processing
  5. Data Format Conversion: Spark job converts CSV to optimized Parquet format
  6. Currency Normalization: PySpark transforms raw data, normalizing all currencies to SGD
  7. Processed Storage: Final data stored in S3 as partitioned Parquet files
  8. Catalog Population: Glue Crawler creates metadata in Glue Data Catalog
  9. Analytics Queries: Athena SQL queries analyze the processed data

�📦 Prerequisites

Required Accounts & Tools

  1. AWS Account - Sign up for free tier
  2. OpenExchangeRates API Key - Get free API key
  3. Docker - Install Docker Desktop
  4. AWS CLI - Installation Guide

AWS Services Required

  • Amazon S3 (Storage)
  • Amazon EMR (Spark Processing)
  • Amazon Athena (SQL Queries)
  • AWS Glue Data Catalog

📁 Project Structure

financial-data-pipeline/
├── Code/
│   └── pysparkapi/
│       ├── main.py                    # Main API ingestion script
│       ├── backfill.py                # Historical data backfill script
│       ├── source_processing.py       # CSV to Parquet conversion
│       ├── Data Manipulation.py       # PySpark transformation job
│       ├── Athena SQL Queries.sql     # Analytics queries
│       ├── Dockerfile                 # Docker configuration
│       ├── requirements.txt           # Python dependencies
│       ├── aws.txt                    # AWS credentials template
│       ├── Connect to EMR.txt         # EMR connection guide
│       └── crontab.txt                # Cron schedule configuration
├── Data/
│   └── banktxn.csv                    # Sample bank transaction data
└── README.md

🚀 Setup Instructions

1. Clone the Repository

git clone https://github.com/yourusername/financial-data-pipeline.git
cd financial-data-pipeline

2. Configure AWS CLI

# Install AWS CLI
# macOS
brew install awscli

# Configure credentials
aws configure
# Enter your AWS Access Key ID, Secret Access Key, Region

3. Create S3 Buckets

# Create buckets for the pipeline
aws s3 mb s3://your-bucket-name

# Create folder structure
aws s3api put-object --bucket your-bucket-name --key api_response/
aws s3api put-object --bucket your-bucket-name --key banktxn/csv/
aws s3api put-object --bucket your-bucket-name --key banktxn/parquet/
aws s3api put-object --bucket your-bucket-name --key banktxn/processed/

4. Upload Bank Transaction Data

aws s3 cp Data/banktxn.csv s3://your-bucket-name/banktxn/csv/

5. Set Up Python Environment

cd Code/pysparkapi
pip install -r requirements.txt

6. Docker Setup (Optional)

# Build Docker image
docker build -t financial-pipeline . -f Dockerfile

# Run container
docker run -dit financial-pipeline

# Copy AWS credentials to container
docker cp ~/.aws/credentials <container-id>:/root/.aws/credentials

📖 Usage

Running the Exchange Rate Ingestion

# Fetch exchange rates for a specific date
python main.py --run_ts "2024-01-15" --config '{
  "app_id": "YOUR_OPENEXCHANGE_API_KEY",
  "s3_out_location": "s3://your-bucket-name/api_response/",
  "s3_error_out_location": "s3://your-bucket-name/error_temp_dir/"
}'

# Fetch today's exchange rates (omit --run_ts)
python main.py --config '{
  "app_id": "YOUR_OPENEXCHANGE_API_KEY",
  "s3_out_location": "s3://your-bucket-name/api_response/",
  "s3_error_out_location": "s3://your-bucket-name/error_temp_dir/"
}'

Historical Backfill

To load historical exchange rates:

python backfill.py

Running PySpark Jobs on EMR

  1. SSH into EMR Cluster:
ssh -i your-key.pem hadoop@your-emr-master-dns
  1. Convert CSV to Parquet:
spark-submit source_processing.py
  1. Run Data Transformation:
spark-submit "Data Manipulation.py"

📊 Analytics Queries

Run these queries in Amazon Athena after setting up the Glue Data Catalog:

Account Summary

SELECT 
    account_no, 
    SUM(COALESCE(withdrawal_sgd_amt, 0)) AS total_withdrawals_sgd, 
    SUM(COALESCE(deposit_sgd_amt, 0)) AS total_deposits_sgd
FROM "pyspark-database"."processed" 
GROUP BY account_no;

Credit Worthiness Analysis

SELECT 
    account_no,
    SUM(deposit_sgd_amt) AS total_deposits_sgd,
    SUM(withdrawal_sgd_amt) AS total_withdrawals_sgd,
    SUM(deposit_sgd_amt) / NULLIF(SUM(withdrawal_sgd_amt), 0) AS credit_ratio,
    CASE 
        WHEN SUM(deposit_sgd_amt) / NULLIF(SUM(withdrawal_sgd_amt), 0) > 1.5 
            THEN 'High Creditworthiness'
        WHEN SUM(deposit_sgd_amt) / NULLIF(SUM(withdrawal_sgd_amt), 0) BETWEEN 1.0 AND 1.5 
            THEN 'Moderate Creditworthiness'
        ELSE 'Low Creditworthiness'
    END AS creditworthiness
FROM "pyspark-database"."processed"
GROUP BY account_no;

Deposits by Currency

SELECT
    deposit_currency,
    SUM(deposit_sgd_amt) AS total_deposit
FROM "pyspark-database"."processed" 
GROUP BY deposit_currency;

⏰ Scheduling with Cron

Set up automated daily ingestion:

# Edit crontab
crontab -e

# Add this line to run daily at 8 AM
0 8 * * * /usr/bin/python3 /path/to/main.py --config '{"app_id":"YOUR_KEY","s3_out_location":"s3://bucket/api_response/","s3_error_out_location":"s3://bucket/error/"}'

🔧 Configuration

Environment Variables

Variable Description
AWS_ACCESS_KEY_ID AWS access key
AWS_SECRET_ACCESS_KEY AWS secret key
AWS_DEFAULT_REGION AWS region (e.g., us-east-1)
OPENEXCHANGE_APP_ID OpenExchangeRates API key

S3 Bucket Structure

s3://your-bucket/
├── api_response/
│   └── response/
│       └── YYYY-MM-DD/
│           └── currency_details_sgd.parquet
├── banktxn/
│   ├── csv/
│   │   └── banktxn.csv
│   ├── parquet/
│   │   └── *.parquet
│   └── processed/
│       └── *.parquet
└── error_temp_dir/
    └── error_error/
        └── YYYY-MM-DD/
            └── currency_details_sgd_error.parquet

🤝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/AmazingFeature)
  3. Commit your changes (git commit -m 'Add some AmazingFeature')
  4. Push to the branch (git push origin feature/AmazingFeature)
  5. Open a Pull Request

🎓 Key Takeaways

By completing this project, you will gain:

  • ✅ Understanding of real-world FinTech use cases and datasets
  • ✅ Hands-on experience with AWS services (S3, EC2, EMR, Glue, Athena)
  • ✅ Ability to visualize and build complete data pipelines
  • ✅ SSH connection to AWS EC2 instances
  • ✅ Docker installation and container management on EC2
  • ✅ Building Docker images and running containers
  • ✅ Processing API data within containerized environments
  • ✅ Creating and managing AWS EMR clusters
  • ✅ PySpark data processing and transformations
  • ✅ Working with Glue Crawlers and Data Catalog
  • ✅ Writing analytical SQL queries in Athena

📄 License

This project is licensed under the MIT License - see the LICENSE file for details.

🙏 Acknowledgments


⭐ If you found this project helpful, please consider giving it a star!

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors