diff --git a/README.md b/README.md index 49592bffc..59a8517e4 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,8 @@ See `backend/app/db/schema.sql`. Key tables: OpenAPI: `backend/app/openapi.yaml` - Auth: `/auth/register`, `/auth/login`, `/auth/refresh` - Expenses: CRUD `/expenses` +- Bank sync: `/expenses/bank-connectors`, `/expenses/bank-connections`, + `/expenses/bank-connections/{id}/import`, `/expenses/bank-connections/{id}/refresh` - Bills: CRUD `/bills`, pay/mark `/bills/{id}/pay` - Reminders: CRUD `/reminders`, trigger `/reminders/run` - Insights: `/insights/monthly`, `/insights/budget-suggestion` @@ -176,6 +178,20 @@ finmind/ - Logs are emitted as JSON with `request_id` and shipped to Loki via Promtail. - Pre-provisioned Grafana dashboard: `FinMind Operations and KPI`. +## Bank Sync Connector Architecture +- `packages/backend/app/services/bank_connectors.py` defines the connector + interface, registry, normalized transaction DTO, import flow, and refresh flow. +- The built-in `mock` connector provides deterministic transactions for tests and + local demos without storing real banking credentials. +- The `account_aggregator` connector is the AA/API-provider extension point for + Indian bank integrations. It stores provider/account/consent references and + imports normalized partner-portal transaction payloads while keeping live + provider credentials in env-backed adapters or partner portals. +- Future providers can be added by implementing `BankConnector` and registering + the connector key in `CONNECTORS`. +- Imported bank transactions reuse the existing expense duplicate guard on + `(user_id, spent_at, amount, notes)` and invalidate dashboard/insight caches. + ## Contribution Policy - See `CONTRIBUTING.md` for fork-first contribution flow and PR requirements. diff --git a/packages/backend/app/__init__.py b/packages/backend/app/__init__.py index cdf76b45f..db24eac1b 100644 --- a/packages/backend/app/__init__.py +++ b/packages/backend/app/__init__.py @@ -1,6 +1,6 @@ from flask import Flask, jsonify from .config import Settings -from .extensions import db, jwt +from .extensions import configure_redis, db, jwt from .routes import register_routes from .observability import ( Observability, @@ -44,6 +44,7 @@ def create_app(settings: Settings | None = None) -> Flask: # Extensions db.init_app(app) jwt.init_app(app) + configure_redis(cfg.redis_url) app.extensions["observability"] = Observability() # CORS for local dev frontend CORS(app, resources={r"*": {"origins": "*"}}, supports_credentials=True) @@ -110,6 +111,69 @@ def _ensure_schema_compatibility(app: Flask) -> None: NOT NULL DEFAULT 'INR' """ ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS bank_connections ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + connector_key VARCHAR(80) NOT NULL, + display_name VARCHAR(200) NOT NULL, + status VARCHAR(40) NOT NULL DEFAULT 'connected', + settings_json JSONB NOT NULL DEFAULT '{}'::jsonb, + last_synced_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW() + ) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_bank_connections_user + ON bank_connections(user_id, created_at DESC) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS bank_sync_runs ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + connection_id INT NOT NULL + REFERENCES bank_connections(id) ON DELETE CASCADE, + status VARCHAR(40) NOT NULL DEFAULT 'running', + imported_count INT NOT NULL DEFAULT 0, + duplicate_count INT NOT NULL DEFAULT 0, + started_at TIMESTAMP NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP, + error VARCHAR(500) + ) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_bank_sync_runs_connection + ON bank_sync_runs(connection_id, started_at DESC) + """ + ) + cur.execute( + """ + CREATE TABLE IF NOT EXISTS bank_imported_transactions ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + connection_id INT NOT NULL + REFERENCES bank_connections(id) ON DELETE CASCADE, + expense_id INT NOT NULL REFERENCES expenses(id) ON DELETE CASCADE, + external_id VARCHAR(255) NOT NULL, + imported_at TIMESTAMP NOT NULL DEFAULT NOW(), + CONSTRAINT uq_bank_imported_transactions_connection_external + UNIQUE (connection_id, external_id) + ) + """ + ) + cur.execute( + """ + CREATE INDEX IF NOT EXISTS idx_bank_imported_transactions_user + ON bank_imported_transactions(user_id, imported_at DESC) + """ + ) conn.commit() except Exception: app.logger.exception( diff --git a/packages/backend/app/db/schema.sql b/packages/backend/app/db/schema.sql index 410189def..7020b8a00 100644 --- a/packages/backend/app/db/schema.sql +++ b/packages/backend/app/db/schema.sql @@ -59,6 +59,42 @@ CREATE INDEX IF NOT EXISTS idx_recurring_expenses_user_start ON recurring_expens ALTER TABLE expenses ADD COLUMN IF NOT EXISTS source_recurring_id INT REFERENCES recurring_expenses(id) ON DELETE SET NULL; +CREATE TABLE IF NOT EXISTS bank_connections ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + connector_key VARCHAR(80) NOT NULL, + display_name VARCHAR(200) NOT NULL, + status VARCHAR(40) NOT NULL DEFAULT 'connected', + settings_json JSONB NOT NULL DEFAULT '{}'::jsonb, + last_synced_at TIMESTAMP, + created_at TIMESTAMP NOT NULL DEFAULT NOW() +); +CREATE INDEX IF NOT EXISTS idx_bank_connections_user ON bank_connections(user_id, created_at DESC); + +CREATE TABLE IF NOT EXISTS bank_sync_runs ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + connection_id INT NOT NULL REFERENCES bank_connections(id) ON DELETE CASCADE, + status VARCHAR(40) NOT NULL DEFAULT 'running', + imported_count INT NOT NULL DEFAULT 0, + duplicate_count INT NOT NULL DEFAULT 0, + started_at TIMESTAMP NOT NULL DEFAULT NOW(), + completed_at TIMESTAMP, + error VARCHAR(500) +); +CREATE INDEX IF NOT EXISTS idx_bank_sync_runs_connection ON bank_sync_runs(connection_id, started_at DESC); + +CREATE TABLE IF NOT EXISTS bank_imported_transactions ( + id SERIAL PRIMARY KEY, + user_id INT NOT NULL REFERENCES users(id) ON DELETE CASCADE, + connection_id INT NOT NULL REFERENCES bank_connections(id) ON DELETE CASCADE, + expense_id INT NOT NULL REFERENCES expenses(id) ON DELETE CASCADE, + external_id VARCHAR(255) NOT NULL, + imported_at TIMESTAMP NOT NULL DEFAULT NOW(), + CONSTRAINT uq_bank_imported_transactions_connection_external UNIQUE (connection_id, external_id) +); +CREATE INDEX IF NOT EXISTS idx_bank_imported_transactions_user ON bank_imported_transactions(user_id, imported_at DESC); + DO $$ BEGIN CREATE TYPE bill_cadence AS ENUM ('MONTHLY','WEEKLY','YEARLY','ONCE'); EXCEPTION diff --git a/packages/backend/app/extensions.py b/packages/backend/app/extensions.py index bad98fae7..1458208ee 100644 --- a/packages/backend/app/extensions.py +++ b/packages/backend/app/extensions.py @@ -9,3 +9,11 @@ _settings = Settings() redis_client = redis.Redis.from_url(_settings.redis_url, decode_responses=True) + + +def configure_redis(redis_url: str) -> None: + redis_client.connection_pool.disconnect() + redis_client.connection_pool = redis.ConnectionPool.from_url( + redis_url, + decode_responses=True, + ) diff --git a/packages/backend/app/models.py b/packages/backend/app/models.py index 64d448104..7a2125670 100644 --- a/packages/backend/app/models.py +++ b/packages/backend/app/models.py @@ -1,6 +1,7 @@ from datetime import datetime, date from enum import Enum from sqlalchemy import Enum as SAEnum +from sqlalchemy import UniqueConstraint from .extensions import db @@ -43,6 +44,52 @@ class Expense(db.Model): created_at = db.Column(db.DateTime, default=datetime.now, nullable=False) +class BankConnection(db.Model): + __tablename__ = "bank_connections" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + connector_key = db.Column(db.String(80), nullable=False) + display_name = db.Column(db.String(200), nullable=False) + status = db.Column(db.String(40), default="connected", nullable=False) + settings_json = db.Column(db.JSON, default=dict, nullable=False) + last_synced_at = db.Column(db.DateTime, nullable=True) + created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + +class BankSyncRun(db.Model): + __tablename__ = "bank_sync_runs" + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + connection_id = db.Column( + db.Integer, db.ForeignKey("bank_connections.id"), nullable=False + ) + status = db.Column(db.String(40), default="running", nullable=False) + imported_count = db.Column(db.Integer, default=0, nullable=False) + duplicate_count = db.Column(db.Integer, default=0, nullable=False) + started_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + completed_at = db.Column(db.DateTime, nullable=True) + error = db.Column(db.String(500), nullable=True) + + +class BankImportedTransaction(db.Model): + __tablename__ = "bank_imported_transactions" + __table_args__ = ( + UniqueConstraint( + "connection_id", + "external_id", + name="uq_bank_imported_transactions_connection_external", + ), + ) + id = db.Column(db.Integer, primary_key=True) + user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False) + connection_id = db.Column( + db.Integer, db.ForeignKey("bank_connections.id"), nullable=False + ) + expense_id = db.Column(db.Integer, db.ForeignKey("expenses.id"), nullable=False) + external_id = db.Column(db.String(255), nullable=False) + imported_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False) + + class RecurringCadence(str, Enum): DAILY = "DAILY" WEEKLY = "WEEKLY" diff --git a/packages/backend/app/openapi.yaml b/packages/backend/app/openapi.yaml index 3f8ec3f0f..518086934 100644 --- a/packages/backend/app/openapi.yaml +++ b/packages/backend/app/openapi.yaml @@ -251,6 +251,131 @@ paths: properties: inserted: { type: integer } + /expenses/bank-connectors: + get: + summary: List available bank sync connectors + tags: [Expenses] + security: [{ bearerAuth: [] }] + responses: + '200': + description: Available connectors + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/BankConnector' + + /expenses/bank-connections: + get: + summary: List user bank connections + tags: [Expenses] + security: [{ bearerAuth: [] }] + responses: + '200': + description: Bank connections + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/BankConnection' + post: + summary: Create a bank connection + tags: [Expenses] + security: [{ bearerAuth: [] }] + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/NewBankConnection' + example: + connector_key: account_aggregator + config: + provider_name: Setu AA + account_ref: masked:XXXX1234 + display_name: HDFC Savings + currency: INR + consent_handle: consent-123 + transactions: + - external_id: aa-1 + posted_at: '2026-04-01' + amount: '-299.00' + description: UPI Grocery Store + currency: INR + responses: + '201': + description: Created connection + content: + application/json: + schema: + $ref: '#/components/schemas/BankConnection' + '400': + description: Unsupported connector or invalid config + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + + /expenses/bank-connections/{connectionId}/import: + post: + summary: Import transactions from a bank connection + tags: [Expenses] + security: [{ bearerAuth: [] }] + parameters: + - in: path + name: connectionId + required: true + schema: { type: integer } + requestBody: + required: false + content: + application/json: + schema: + type: object + properties: + since: { type: string, format: date, nullable: true } + responses: + '201': + description: Import result + content: + application/json: + schema: + $ref: '#/components/schemas/BankImportResult' + '400': + description: Invalid date or connector payload + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + '404': + description: Connection not found + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + + /expenses/bank-connections/{connectionId}/refresh: + post: + summary: Refresh transactions from the last sync window + tags: [Expenses] + security: [{ bearerAuth: [] }] + parameters: + - in: path + name: connectionId + required: true + schema: { type: integer } + responses: + '200': + description: Refresh result + content: + application/json: + schema: + $ref: '#/components/schemas/BankImportResult' + '404': + description: Connection not found + content: + application/json: + schema: { $ref: '#/components/schemas/Error' } + /bills: get: summary: List bills @@ -548,6 +673,38 @@ components: cadence: { type: string, enum: [DAILY, WEEKLY, MONTHLY, YEARLY] } start_date: { type: string, format: date } end_date: { type: string, format: date, nullable: true } + BankConnector: + type: object + properties: + key: { type: string } + name: { type: string } + description: { type: string } + supports_refresh: { type: boolean } + BankConnection: + type: object + properties: + id: { type: integer } + connector_key: { type: string } + display_name: { type: string } + status: { type: string } + last_synced_at: { type: string, format: date-time, nullable: true } + created_at: { type: string, format: date-time } + NewBankConnection: + type: object + required: [connector_key] + properties: + connector_key: { type: string } + config: + type: object + additionalProperties: true + BankImportResult: + type: object + properties: + run_id: { type: integer } + connection_id: { type: integer } + inserted: { type: integer } + duplicates: { type: integer } + status: { type: string } Bill: type: object properties: diff --git a/packages/backend/app/routes/auth.py b/packages/backend/app/routes/auth.py index 05a39377d..0c4f6fb41 100644 --- a/packages/backend/app/routes/auth.py +++ b/packages/backend/app/routes/auth.py @@ -15,6 +15,7 @@ bp = Blueprint("auth", __name__) logger = logging.getLogger("finmind.auth") +_refresh_fallback_sessions: dict[str, tuple[str, int]] = {} SUPPORTED_CURRENCIES = { "USD", "INR", @@ -106,7 +107,12 @@ def update_me(): def refresh(): claims = get_jwt() jti = claims.get("jti") - if not jti or not redis_client.get(_refresh_key(jti)): + try: + known_refresh = redis_client.get(_refresh_key(jti)) if jti else None + except Exception: + logger.warning("Refresh falling back to in-process token store") + known_refresh = _get_fallback_refresh_session(jti) + if not known_refresh: logger.warning("Refresh rejected: revoked/unknown token jti=%s", jti) return jsonify(error="refresh token revoked"), 401 uid = get_jwt_identity() @@ -121,7 +127,11 @@ def logout(): claims = get_jwt() jti = claims.get("jti") if jti: - redis_client.delete(_refresh_key(jti)) + try: + redis_client.delete(_refresh_key(jti)) + except Exception: + logger.warning("Logout falling back to in-process token store") + _refresh_fallback_sessions.pop(jti, None) return jsonify(message="logged out"), 200 @@ -136,4 +146,21 @@ def _store_refresh_session(refresh_token: str, uid: str): if not jti or not exp: return ttl = max(int(exp - time.time()), 1) - redis_client.setex(_refresh_key(jti), ttl, uid) + try: + redis_client.setex(_refresh_key(jti), ttl, uid) + except Exception: + logger.warning("Storing refresh token session in process-local fallback") + _refresh_fallback_sessions[jti] = (uid, int(exp)) + + +def _get_fallback_refresh_session(jti: str | None) -> str | None: + if not jti: + return None + session = _refresh_fallback_sessions.get(jti) + if not session: + return None + uid, exp = session + if exp <= int(time.time()): + _refresh_fallback_sessions.pop(jti, None) + return None + return uid diff --git a/packages/backend/app/routes/expenses.py b/packages/backend/app/routes/expenses.py index 1376d46f5..a88425555 100644 --- a/packages/backend/app/routes/expenses.py +++ b/packages/backend/app/routes/expenses.py @@ -7,7 +7,7 @@ from ..extensions import db from ..models import Expense, RecurringCadence, RecurringExpense, User from ..services.cache import cache_delete_patterns, monthly_summary_key -from ..services import expense_import +from ..services import bank_connectors, expense_import import logging bp = Blueprint("expenses", __name__) @@ -202,6 +202,75 @@ def generate_recurring_expenses(recurring_id: int): return jsonify(inserted=inserted), 200 +@bp.get("/bank-connectors") +@jwt_required() +def list_bank_connectors(): + return jsonify(bank_connectors.list_connectors()) + + +@bp.get("/bank-connections") +@jwt_required() +def list_bank_connections(): + uid = int(get_jwt_identity()) + connections = bank_connectors.list_connections(user_id=uid) + return jsonify([bank_connectors.connection_to_dict(item) for item in connections]) + + +@bp.post("/bank-connections") +@jwt_required() +def create_bank_connection(): + uid = int(get_jwt_identity()) + data = request.get_json() or {} + connector_key = data.get("connector_key") + config = data.get("config") or {} + if not isinstance(config, dict): + return jsonify(error="config must be an object"), 400 + try: + connection = bank_connectors.create_connection( + user_id=uid, connector_key=connector_key, config=config + ) + except ValueError as exc: + return jsonify(error=str(exc)), 400 + return jsonify(bank_connectors.connection_to_dict(connection)), 201 + + +@bp.post("/bank-connections//import") +@jwt_required() +def import_bank_connection(connection_id: int): + uid = int(get_jwt_identity()) + data = request.get_json() or {} + since = None + if data.get("since"): + try: + since = date.fromisoformat(data["since"]) + except ValueError: + return jsonify(error="invalid since"), 400 + try: + result = bank_connectors.import_connection_transactions( + user_id=uid, connection_id=connection_id, since=since + ) + except LookupError: + return jsonify(error="not found"), 404 + except ValueError as exc: + return jsonify(error=str(exc)), 400 + return jsonify(result), 201 + + +@bp.post("/bank-connections//refresh") +@jwt_required() +def refresh_bank_connection(connection_id: int): + uid = int(get_jwt_identity()) + try: + result = bank_connectors.refresh_connection( + user_id=uid, connection_id=connection_id + ) + except LookupError: + return jsonify(error="not found"), 404 + except ValueError as exc: + return jsonify(error=str(exc)), 400 + return jsonify(result), 200 + + @bp.patch("/") @jwt_required() def update_expense(expense_id: int): diff --git a/packages/backend/app/services/bank_connectors.py b/packages/backend/app/services/bank_connectors.py new file mode 100644 index 000000000..8a446e11d --- /dev/null +++ b/packages/backend/app/services/bank_connectors.py @@ -0,0 +1,388 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from datetime import date, datetime, timedelta +from decimal import Decimal, InvalidOperation +from typing import Any + +from ..extensions import db +from ..models import BankConnection, BankImportedTransaction, BankSyncRun, Expense, User +from .cache import cache_delete_patterns, monthly_summary_key + + +@dataclass(frozen=True) +class BankTransaction: + external_id: str + posted_at: date + amount: Decimal + description: str + currency: str = "USD" + expense_type: str = "EXPENSE" + category_id: int | None = None + + +@dataclass(frozen=True) +class ConnectorDefinition: + key: str + name: str + description: str + supports_refresh: bool + + +class BankConnector(ABC): + key: str + name: str + description: str + supports_refresh: bool = True + + @abstractmethod + def create_connection( + self, *, user_id: int, config: dict[str, Any] + ) -> BankConnection: + """Create a sanitized connection record for this connector.""" + + @abstractmethod + def fetch_transactions( + self, *, connection: BankConnection, since: date | None + ) -> list[BankTransaction]: + """Return transactions ready to import into FinMind.""" + + def definition(self) -> ConnectorDefinition: + return ConnectorDefinition( + key=self.key, + name=self.name, + description=self.description, + supports_refresh=self.supports_refresh, + ) + + +class MockBankConnector(BankConnector): + key = "mock" + name = "Mock Bank" + description = "Deterministic sample connector for development and tests." + + def create_connection( + self, *, user_id: int, config: dict[str, Any] + ) -> BankConnection: + account_name = str(config.get("account_name") or "Mock Checking").strip() + if not account_name: + raise ValueError("account_name required") + return BankConnection( + user_id=user_id, + connector_key=self.key, + display_name=account_name[:200], + settings_json={ + "account_name": account_name[:200], + "currency": str(config.get("currency") or "USD")[:10], + "transactions": config.get("transactions") or [], + }, + ) + + def fetch_transactions( + self, *, connection: BankConnection, since: date | None + ) -> list[BankTransaction]: + settings = connection.settings_json or {} + rows = settings.get("transactions") or _default_mock_transactions() + transactions = [_transaction_from_row(row, settings) for row in rows] + if since: + transactions = [tx for tx in transactions if tx.posted_at >= since] + return transactions + + +class AccountAggregatorConnector(BankConnector): + key = "account_aggregator" + name = "Account Aggregator / API Provider" + description = ( + "Generic connector for Indian AA or bank API providers using partner " + "portal credentials and normalized transaction payloads." + ) + + def create_connection( + self, *, user_id: int, config: dict[str, Any] + ) -> BankConnection: + provider_name = _required_string(config, "provider_name") + account_ref = _required_string(config, "account_ref") + display_name = str( + config.get("display_name") or f"{provider_name} {account_ref}" + ).strip() + return BankConnection( + user_id=user_id, + connector_key=self.key, + display_name=display_name[:200], + settings_json={ + "provider_name": provider_name[:120], + "account_ref": account_ref[:160], + "consent_handle": str(config.get("consent_handle") or "")[:255], + "sync_cursor": str(config.get("sync_cursor") or "")[:255], + "currency": str(config.get("currency") or "INR")[:10], + # Partner credentials belong in provider portals/env-backed adapters. + # Tests and local demos can pass normalized payloads here. + "transactions": config.get("transactions") or [], + }, + ) + + def fetch_transactions( + self, *, connection: BankConnection, since: date | None + ) -> list[BankTransaction]: + settings = connection.settings_json or {} + rows = settings.get("transactions") or [] + transactions = [_transaction_from_row(row, settings) for row in rows] + if since: + transactions = [tx for tx in transactions if tx.posted_at >= since] + return transactions + + +CONNECTORS: dict[str, BankConnector] = { + AccountAggregatorConnector.key: AccountAggregatorConnector(), + MockBankConnector.key: MockBankConnector(), +} + + +def list_connectors() -> list[dict[str, Any]]: + return [ + { + "key": definition.key, + "name": definition.name, + "description": definition.description, + "supports_refresh": definition.supports_refresh, + } + for definition in (connector.definition() for connector in CONNECTORS.values()) + ] + + +def create_connection( + *, user_id: int, connector_key: str, config: dict[str, Any] +) -> BankConnection: + connector = _get_connector(connector_key) + connection = connector.create_connection(user_id=user_id, config=config) + db.session.add(connection) + db.session.commit() + return connection + + +def list_connections(*, user_id: int) -> list[BankConnection]: + return ( + db.session.query(BankConnection) + .filter_by(user_id=user_id) + .order_by(BankConnection.created_at.desc()) + .all() + ) + + +def import_connection_transactions( + *, user_id: int, connection_id: int, since: date | None = None +) -> dict[str, Any]: + connection = db.session.get(BankConnection, connection_id) + if not connection or connection.user_id != user_id: + raise LookupError("connection not found") + connector = _get_connector(connection.connector_key) + run = BankSyncRun(user_id=user_id, connection_id=connection.id) + db.session.add(run) + db.session.flush() + + try: + rows = connector.fetch_transactions(connection=connection, since=since) + inserted = 0 + duplicates = 0 + touched_months: set[str] = set() + user = db.session.get(User, user_id) + for row in rows: + if _is_duplicate(user_id, connection.id, row): + duplicates += 1 + continue + expense = Expense( + user_id=user_id, + category_id=row.category_id, + amount=abs(row.amount), + currency=row.currency or (user.preferred_currency if user else "INR"), + expense_type=row.expense_type, + notes=row.description[:500], + spent_at=row.posted_at, + ) + db.session.add(expense) + db.session.flush() + db.session.add( + BankImportedTransaction( + user_id=user_id, + connection_id=connection.id, + expense_id=expense.id, + external_id=row.external_id[:255], + ) + ) + inserted += 1 + touched_months.add(row.posted_at.strftime("%Y-%m")) + + now = datetime.utcnow() + run.status = "completed" + run.imported_count = inserted + run.duplicate_count = duplicates + run.completed_at = now + connection.last_synced_at = now + db.session.commit() + for ym in touched_months: + _invalidate_expense_cache(user_id, ym) + return { + "run_id": run.id, + "connection_id": connection.id, + "inserted": inserted, + "duplicates": duplicates, + "status": run.status, + } + except Exception as exc: + run.status = "failed" + run.error = str(exc)[:500] + run.completed_at = datetime.utcnow() + db.session.commit() + raise + + +def refresh_connection(*, user_id: int, connection_id: int) -> dict[str, Any]: + connection = db.session.get(BankConnection, connection_id) + if not connection or connection.user_id != user_id: + raise LookupError("connection not found") + since = None + if connection.last_synced_at: + since = connection.last_synced_at.date() - timedelta(days=7) + return import_connection_transactions( + user_id=user_id, + connection_id=connection_id, + since=since, + ) + + +def connection_to_dict(connection: BankConnection) -> dict[str, Any]: + return { + "id": connection.id, + "connector_key": connection.connector_key, + "display_name": connection.display_name, + "status": connection.status, + "last_synced_at": ( + connection.last_synced_at.isoformat() if connection.last_synced_at else None + ), + "created_at": connection.created_at.isoformat(), + } + + +def _get_connector(connector_key: str) -> BankConnector: + connector = CONNECTORS.get(str(connector_key or "").strip().lower()) + if connector is None: + raise ValueError("unsupported connector") + return connector + + +def _transaction_from_row( + row: dict[str, Any], settings: dict[str, Any] +) -> BankTransaction: + posted_at = _parse_date(row.get("date") or row.get("posted_at")) + amount = _parse_amount(row.get("amount")) + description = str(row.get("description") or row.get("notes") or "").strip() + if not posted_at or amount is None or not description: + raise ValueError("mock transactions require date, amount, and description") + expense_type = str(row.get("expense_type") or "").upper() + if expense_type not in {"EXPENSE", "INCOME"}: + expense_type = ( + "INCOME" if amount > 0 and _looks_like_income(description) else "EXPENSE" + ) + category_id = row.get("category_id") + return BankTransaction( + external_id=str( + row.get("external_id") or f"{posted_at}:{amount}:{description}" + ), + posted_at=posted_at, + amount=amount, + description=description, + currency=str(row.get("currency") or settings.get("currency") or "USD")[:10], + expense_type=expense_type, + category_id=int(category_id) if category_id not in (None, "", "null") else None, + ) + + +def _required_string(config: dict[str, Any], key: str) -> str: + value = str(config.get(key) or "").strip() + if not value: + raise ValueError(f"{key} required") + return value + + +def _default_mock_transactions() -> list[dict[str, Any]]: + return [ + { + "external_id": "mock-001", + "date": "2026-02-10", + "amount": "-12.50", + "description": "Mock Coffee Shop", + "currency": "USD", + }, + { + "external_id": "mock-002", + "date": "2026-02-11", + "amount": "-42.00", + "description": "Mock Grocery Market", + "currency": "USD", + }, + { + "external_id": "mock-003", + "date": "2026-02-15", + "amount": "2500.00", + "description": "Mock Payroll", + "currency": "USD", + "expense_type": "INCOME", + }, + ] + + +def _parse_date(raw: Any) -> date | None: + if raw in (None, ""): + return None + try: + return date.fromisoformat(str(raw)) + except ValueError: + return None + + +def _parse_amount(raw: Any) -> Decimal | None: + try: + return Decimal(str(raw)).quantize(Decimal("0.01")) + except (InvalidOperation, TypeError, ValueError): + return None + + +def _looks_like_income(description: str) -> bool: + upper = description.upper() + return any(token in upper for token in ("PAYROLL", "SALARY", "REFUND", "INTEREST")) + + +def _is_duplicate(user_id: int, connection_id: int, row: BankTransaction) -> bool: + imported = ( + db.session.query(BankImportedTransaction) + .filter_by( + user_id=user_id, + connection_id=connection_id, + external_id=row.external_id[:255], + ) + .first() + ) + if imported: + return True + return ( + db.session.query(Expense) + .filter_by( + user_id=user_id, + spent_at=row.posted_at, + amount=abs(row.amount), + notes=row.description[:500], + ) + .first() + is not None + ) + + +def _invalidate_expense_cache(user_id: int, ym: str) -> None: + cache_delete_patterns( + [ + monthly_summary_key(user_id, ym), + f"insights:{user_id}:*", + f"user:{user_id}:dashboard_summary:*", + ] + ) diff --git a/packages/backend/app/services/cache.py b/packages/backend/app/services/cache.py index cc5eb9a17..c87951c54 100644 --- a/packages/backend/app/services/cache.py +++ b/packages/backend/app/services/cache.py @@ -1,7 +1,10 @@ import json +import logging from typing import Iterable from ..extensions import redis_client +logger = logging.getLogger("finmind.cache") + def monthly_summary_key(user_id: int, ym: str) -> str: return f"user:{user_id}:monthly_summary:{ym}" @@ -25,23 +28,35 @@ def dashboard_summary_key(user_id: int, ym: str) -> str: def cache_set(key: str, value, ttl_seconds: int | None = None): payload = json.dumps(value) - if ttl_seconds: - redis_client.setex(key, ttl_seconds, payload) - else: - redis_client.set(key, payload) + try: + if ttl_seconds: + redis_client.setex(key, ttl_seconds, payload) + else: + redis_client.set(key, payload) + except Exception: + logger.exception("Cache set skipped because Redis is unavailable") def cache_get(key: str): - raw = redis_client.get(key) + try: + raw = redis_client.get(key) + except Exception: + logger.exception("Cache get skipped because Redis is unavailable") + return None return json.loads(raw) if raw else None def cache_delete_patterns(patterns: Iterable[str]): - for pattern in patterns: - cursor = 0 - while True: - cursor, keys = redis_client.scan(cursor=cursor, match=pattern, count=100) - if keys: - redis_client.delete(*keys) - if cursor == 0: - break + try: + for pattern in patterns: + cursor = 0 + while True: + cursor, keys = redis_client.scan( + cursor=cursor, match=pattern, count=100 + ) + if keys: + redis_client.delete(*keys) + if cursor == 0: + break + except Exception: + logger.exception("Cache invalidation skipped because Redis is unavailable") diff --git a/packages/backend/tests/test_bank_connectors.py b/packages/backend/tests/test_bank_connectors.py new file mode 100644 index 000000000..fef24162a --- /dev/null +++ b/packages/backend/tests/test_bank_connectors.py @@ -0,0 +1,226 @@ +def test_bank_connector_registry_lists_mock_connector(client, auth_header): + r = client.get("/expenses/bank-connectors", headers=auth_header) + assert r.status_code == 200 + connectors = r.get_json() + by_key = {connector["key"]: connector for connector in connectors} + assert by_key["mock"]["supports_refresh"] is True + assert by_key["account_aggregator"]["supports_refresh"] is True + + +def test_account_aggregator_connector_imports_normalized_provider_payload( + client, auth_header +): + payload = { + "connector_key": "account_aggregator", + "config": { + "provider_name": "Setu AA", + "account_ref": "masked:XXXX1234", + "display_name": "HDFC Savings", + "currency": "INR", + "consent_handle": "consent-123", + "transactions": [ + { + "external_id": "aa-1", + "posted_at": "2026-04-01", + "amount": "-299.00", + "description": "UPI Grocery Store", + "currency": "INR", + } + ], + }, + } + + r = client.post("/expenses/bank-connections", json=payload, headers=auth_header) + assert r.status_code == 201 + connection = r.get_json() + assert connection["connector_key"] == "account_aggregator" + assert connection["display_name"] == "HDFC Savings" + + r = client.post( + f"/expenses/bank-connections/{connection['id']}/import", + json={}, + headers=auth_header, + ) + assert r.status_code == 201 + assert r.get_json()["inserted"] == 1 + + r = client.get("/expenses?search=UPI%20Grocery", headers=auth_header) + assert r.status_code == 200 + expenses = r.get_json() + assert expenses[0]["currency"] == "INR" + assert expenses[0]["description"] == "UPI Grocery Store" + + +def test_account_aggregator_requires_provider_identity(client, auth_header): + r = client.post( + "/expenses/bank-connections", + json={ + "connector_key": "account_aggregator", + "config": {"account_ref": "masked:XXXX1234"}, + }, + headers=auth_header, + ) + assert r.status_code == 400 + assert r.get_json()["error"] == "provider_name required" + + +def test_mock_bank_connection_import_and_refresh_prevent_duplicates( + client, auth_header +): + payload = { + "connector_key": "mock", + "config": { + "account_name": "Test Checking", + "currency": "USD", + "transactions": [ + { + "external_id": "tx-1", + "date": "2026-03-01", + "amount": "-11.25", + "description": "Card Coffee", + }, + { + "external_id": "tx-2", + "date": "2026-03-02", + "amount": "1500.00", + "description": "Payroll", + "expense_type": "INCOME", + }, + ], + }, + } + r = client.post("/expenses/bank-connections", json=payload, headers=auth_header) + assert r.status_code == 201 + connection = r.get_json() + assert connection["connector_key"] == "mock" + assert connection["display_name"] == "Test Checking" + connection_id = connection["id"] + + r = client.get("/expenses/bank-connections", headers=auth_header) + assert r.status_code == 200 + assert r.get_json()[0]["id"] == connection_id + + r = client.post( + f"/expenses/bank-connections/{connection_id}/import", + json={}, + headers=auth_header, + ) + assert r.status_code == 201 + imported = r.get_json() + assert imported["inserted"] == 2 + assert imported["duplicates"] == 0 + assert imported["status"] == "completed" + + r = client.post( + f"/expenses/bank-connections/{connection_id}/import", + json={}, + headers=auth_header, + ) + assert r.status_code == 201 + second_import = r.get_json() + assert second_import["inserted"] == 0 + assert second_import["duplicates"] == 2 + + r = client.post( + f"/expenses/bank-connections/{connection_id}/refresh", + headers=auth_header, + ) + assert r.status_code == 200 + refreshed = r.get_json() + assert refreshed["inserted"] == 0 + assert refreshed["duplicates"] == 0 + + r = client.get("/expenses?search=Card%20Coffee", headers=auth_header) + assert r.status_code == 200 + expenses = r.get_json() + assert len(expenses) == 1 + assert expenses[0]["description"] == "Card Coffee" + assert expenses[0]["amount"] == 11.25 + + +def test_bank_connection_rejects_unknown_connector(client, auth_header): + r = client.post( + "/expenses/bank-connections", + json={"connector_key": "plaid", "config": {}}, + headers=auth_header, + ) + assert r.status_code == 400 + assert r.get_json()["error"] == "unsupported connector" + + +def test_bank_connection_import_since_filter_and_invalid_since(client, auth_header): + r = client.post( + "/expenses/bank-connections", + json={ + "connector_key": "mock", + "config": { + "transactions": [ + { + "external_id": "old", + "date": "2026-01-01", + "amount": "-10.00", + "description": "Old card charge", + }, + { + "external_id": "new", + "date": "2026-02-01", + "amount": "-20.00", + "description": "New card charge", + }, + ] + }, + }, + headers=auth_header, + ) + assert r.status_code == 201 + connection_id = r.get_json()["id"] + + r = client.post( + f"/expenses/bank-connections/{connection_id}/import", + json={"since": "not-a-date"}, + headers=auth_header, + ) + assert r.status_code == 400 + assert r.get_json()["error"] == "invalid since" + + r = client.post( + f"/expenses/bank-connections/{connection_id}/import", + json={"since": "2026-02-01"}, + headers=auth_header, + ) + assert r.status_code == 201 + assert r.get_json()["inserted"] == 1 + + r = client.get("/expenses", headers=auth_header) + assert r.status_code == 200 + descriptions = [item["description"] for item in r.get_json()] + assert descriptions == ["New card charge"] + + +def test_bank_connection_cannot_import_another_users_connection(client, auth_header): + r = client.post( + "/expenses/bank-connections", + json={"connector_key": "mock", "config": {"account_name": "Private Bank"}}, + headers=auth_header, + ) + assert r.status_code == 201 + connection_id = r.get_json()["id"] + + client.post( + "/auth/register", + json={"email": "other@example.com", "password": "password123"}, + ) + r = client.post( + "/auth/login", + json={"email": "other@example.com", "password": "password123"}, + ) + assert r.status_code == 200 + other_auth = {"Authorization": f"Bearer {r.get_json()['access_token']}"} + + r = client.post( + f"/expenses/bank-connections/{connection_id}/import", + json={}, + headers=other_auth, + ) + assert r.status_code == 404 + assert r.get_json()["error"] == "not found"