Skip to content
This repository was archived by the owner on Jun 19, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ See `backend/app/db/schema.sql`. Key tables:
OpenAPI: `backend/app/openapi.yaml`
- Auth: `/auth/register`, `/auth/login`, `/auth/refresh`
- Expenses: CRUD `/expenses`
- Bank sync: `/expenses/bank-connectors`, `/expenses/bank-connections`,
`/expenses/bank-connections/{id}/import`, `/expenses/bank-connections/{id}/refresh`
- Bills: CRUD `/bills`, pay/mark `/bills/{id}/pay`
- Reminders: CRUD `/reminders`, trigger `/reminders/run`
- Insights: `/insights/monthly`, `/insights/budget-suggestion`
Expand Down Expand Up @@ -176,6 +178,20 @@ finmind/
- Logs are emitted as JSON with `request_id` and shipped to Loki via Promtail.
- Pre-provisioned Grafana dashboard: `FinMind Operations and KPI`.

## Bank Sync Connector Architecture
- `packages/backend/app/services/bank_connectors.py` defines the connector
interface, registry, normalized transaction DTO, import flow, and refresh flow.
- The built-in `mock` connector provides deterministic transactions for tests and
local demos without storing real banking credentials.
- The `account_aggregator` connector is the AA/API-provider extension point for
Indian bank integrations. It stores provider/account/consent references and
imports normalized partner-portal transaction payloads while keeping live
provider credentials in env-backed adapters or partner portals.
- Future providers can be added by implementing `BankConnector` and registering
the connector key in `CONNECTORS`.
- Imported bank transactions reuse the existing expense duplicate guard on
`(user_id, spent_at, amount, notes)` and invalidate dashboard/insight caches.

## Contribution Policy
- See `CONTRIBUTING.md` for fork-first contribution flow and PR requirements.

Expand Down
66 changes: 65 additions & 1 deletion packages/backend/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from flask import Flask, jsonify
from .config import Settings
from .extensions import db, jwt
from .extensions import configure_redis, db, jwt
from .routes import register_routes
from .observability import (
Observability,
Expand Down Expand Up @@ -44,6 +44,7 @@ def create_app(settings: Settings | None = None) -> Flask:
# Extensions
db.init_app(app)
jwt.init_app(app)
configure_redis(cfg.redis_url)
app.extensions["observability"] = Observability()
# CORS for local dev frontend
CORS(app, resources={r"*": {"origins": "*"}}, supports_credentials=True)
Expand Down Expand Up @@ -110,6 +111,69 @@ def _ensure_schema_compatibility(app: Flask) -> None:
NOT NULL DEFAULT 'INR'
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS bank_connections (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connector_key VARCHAR(80) NOT NULL,
display_name VARCHAR(200) NOT NULL,
status VARCHAR(40) NOT NULL DEFAULT 'connected',
settings_json JSONB NOT NULL DEFAULT '{}'::jsonb,
last_synced_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_bank_connections_user
ON bank_connections(user_id, created_at DESC)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS bank_sync_runs (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connection_id INT NOT NULL
REFERENCES bank_connections(id) ON DELETE CASCADE,
status VARCHAR(40) NOT NULL DEFAULT 'running',
imported_count INT NOT NULL DEFAULT 0,
duplicate_count INT NOT NULL DEFAULT 0,
started_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP,
error VARCHAR(500)
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_bank_sync_runs_connection
ON bank_sync_runs(connection_id, started_at DESC)
"""
)
cur.execute(
"""
CREATE TABLE IF NOT EXISTS bank_imported_transactions (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connection_id INT NOT NULL
REFERENCES bank_connections(id) ON DELETE CASCADE,
expense_id INT NOT NULL REFERENCES expenses(id) ON DELETE CASCADE,
external_id VARCHAR(255) NOT NULL,
imported_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT uq_bank_imported_transactions_connection_external
UNIQUE (connection_id, external_id)
)
"""
)
cur.execute(
"""
CREATE INDEX IF NOT EXISTS idx_bank_imported_transactions_user
ON bank_imported_transactions(user_id, imported_at DESC)
"""
)
conn.commit()
except Exception:
app.logger.exception(
Expand Down
36 changes: 36 additions & 0 deletions packages/backend/app/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,42 @@ CREATE INDEX IF NOT EXISTS idx_recurring_expenses_user_start ON recurring_expens
ALTER TABLE expenses
ADD COLUMN IF NOT EXISTS source_recurring_id INT REFERENCES recurring_expenses(id) ON DELETE SET NULL;

CREATE TABLE IF NOT EXISTS bank_connections (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connector_key VARCHAR(80) NOT NULL,
display_name VARCHAR(200) NOT NULL,
status VARCHAR(40) NOT NULL DEFAULT 'connected',
settings_json JSONB NOT NULL DEFAULT '{}'::jsonb,
last_synced_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_bank_connections_user ON bank_connections(user_id, created_at DESC);

CREATE TABLE IF NOT EXISTS bank_sync_runs (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connection_id INT NOT NULL REFERENCES bank_connections(id) ON DELETE CASCADE,
status VARCHAR(40) NOT NULL DEFAULT 'running',
imported_count INT NOT NULL DEFAULT 0,
duplicate_count INT NOT NULL DEFAULT 0,
started_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP,
error VARCHAR(500)
);
CREATE INDEX IF NOT EXISTS idx_bank_sync_runs_connection ON bank_sync_runs(connection_id, started_at DESC);

CREATE TABLE IF NOT EXISTS bank_imported_transactions (
id SERIAL PRIMARY KEY,
user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE,
connection_id INT NOT NULL REFERENCES bank_connections(id) ON DELETE CASCADE,
expense_id INT NOT NULL REFERENCES expenses(id) ON DELETE CASCADE,
external_id VARCHAR(255) NOT NULL,
imported_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT uq_bank_imported_transactions_connection_external UNIQUE (connection_id, external_id)
);
CREATE INDEX IF NOT EXISTS idx_bank_imported_transactions_user ON bank_imported_transactions(user_id, imported_at DESC);

DO $$ BEGIN
CREATE TYPE bill_cadence AS ENUM ('MONTHLY','WEEKLY','YEARLY','ONCE');
EXCEPTION
Expand Down
8 changes: 8 additions & 0 deletions packages/backend/app/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,11 @@

_settings = Settings()
redis_client = redis.Redis.from_url(_settings.redis_url, decode_responses=True)


def configure_redis(redis_url: str) -> None:
redis_client.connection_pool.disconnect()
redis_client.connection_pool = redis.ConnectionPool.from_url(
redis_url,
decode_responses=True,
)
47 changes: 47 additions & 0 deletions packages/backend/app/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, date
from enum import Enum
from sqlalchemy import Enum as SAEnum
from sqlalchemy import UniqueConstraint
from .extensions import db


Expand Down Expand Up @@ -43,6 +44,52 @@ class Expense(db.Model):
created_at = db.Column(db.DateTime, default=datetime.now, nullable=False)


class BankConnection(db.Model):
__tablename__ = "bank_connections"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
connector_key = db.Column(db.String(80), nullable=False)
display_name = db.Column(db.String(200), nullable=False)
status = db.Column(db.String(40), default="connected", nullable=False)
settings_json = db.Column(db.JSON, default=dict, nullable=False)
last_synced_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class BankSyncRun(db.Model):
__tablename__ = "bank_sync_runs"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
connection_id = db.Column(
db.Integer, db.ForeignKey("bank_connections.id"), nullable=False
)
status = db.Column(db.String(40), default="running", nullable=False)
imported_count = db.Column(db.Integer, default=0, nullable=False)
duplicate_count = db.Column(db.Integer, default=0, nullable=False)
started_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
completed_at = db.Column(db.DateTime, nullable=True)
error = db.Column(db.String(500), nullable=True)


class BankImportedTransaction(db.Model):
__tablename__ = "bank_imported_transactions"
__table_args__ = (
UniqueConstraint(
"connection_id",
"external_id",
name="uq_bank_imported_transactions_connection_external",
),
)
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
connection_id = db.Column(
db.Integer, db.ForeignKey("bank_connections.id"), nullable=False
)
expense_id = db.Column(db.Integer, db.ForeignKey("expenses.id"), nullable=False)
external_id = db.Column(db.String(255), nullable=False)
imported_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)


class RecurringCadence(str, Enum):
DAILY = "DAILY"
WEEKLY = "WEEKLY"
Expand Down
Loading