Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ Modern async finance backend with secure auth, role-based access control, analyt
![Docker](https://img.shields.io/badge/Docker-Compose-2496ED?logo=docker&logoColor=white)
![Tests](https://img.shields.io/badge/Tests-47%20Passing-success)

## 🔥 Start Here: App Workflow

If you want to understand how this backend actually behaves from login to role-based usage, read:

**[docs/BASIC_FLOW.md](docs/BASIC_FLOW.md)**

It explains the real journey for `viewer`, `analyst`, and `admin`, including auth lifecycle, record lifecycle, dashboard usage order, and websocket presence flow.

## 🌟 What You Get

- Secure JWT auth with refresh rotation and revocation.
Expand Down Expand Up @@ -43,4 +51,4 @@ I thought to build this in Spring Boot because it is widely used in enterprise f

## 🙏 Acknowledgement

"I am genuinely thankful for this problem statement. Building it end-to-end gave me practical experience with RBAC. While I have previously explored caching, rate-limiting, and other system design concepts in my past projects, I had never worked on an RBAC project before, and this gave me great experience truly thankful.
I am genuinely thankful for this problem statement. Building it end-to-end gave me practical experience with RBAC. While I have previously explored caching, rate-limiting, and other system design concepts in my past projects, I had never worked on an RBAC project before, and this gave me great experience truly thankful.
167 changes: 165 additions & 2 deletions app/routes/financial_records.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from datetime import date, datetime, timedelta, timezone

from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status
from redis.asyncio import Redis
from sqlalchemy import delete, func, select
from sqlalchemy import delete, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession

from app.config import get_settings
Expand All @@ -12,12 +12,19 @@
from app.schemas import (
DeletedFinancialRecordListResponse,
DeletedFinancialRecordOut,
CSVImportResponse,
FinancialRecordCreate,
FinancialRecordListResponse,
FinancialRecordOut,
FinancialRecordUpdate,
MessageResponse,
)
from app.services.csv_service import (
CSVParseError,
parse_financial_records_csv,
serialize_financial_records_to_csv,
validate_financial_record_csv_rows,
)
from app.services.dashboard_service import invalidate_dashboard_cache

router = APIRouter(prefix="/financial-records", tags=["Financial Records"])
Expand Down Expand Up @@ -84,6 +91,7 @@ async def list_financial_records(
start_date: date | None = Query(default=None),
end_date: date | None = Query(default=None),
category: str | None = Query(default=None, min_length=2, max_length=100),
search: str | None = Query(default=None, min_length=1, max_length=100),
record_type: RecordType | None = Query(default=None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
Expand All @@ -101,6 +109,14 @@ async def list_financial_records(
filters.append(FinancialRecord.entry_date <= end_date)
if category:
filters.append(FinancialRecord.category == category)
if search:
search_pattern = f"%{search}%"
filters.append(
or_(
FinancialRecord.category.ilike(search_pattern),
FinancialRecord.notes.ilike(search_pattern),
)
)
if record_type:
filters.append(FinancialRecord.record_type == RecordType(record_type.value))

Expand All @@ -124,6 +140,153 @@ async def list_financial_records(
)


@router.get("/export")
async def export_financial_records_csv(
start_date: date | None = Query(default=None),
end_date: date | None = Query(default=None),
category: str | None = Query(default=None, min_length=2, max_length=100),
search: str | None = Query(default=None, min_length=1, max_length=100),
record_type: RecordType | None = Query(default=None),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user),
) -> Response:
_assert_can_read_records(current_user)
await _purge_expired_deleted_records(db)

if start_date and end_date and start_date > end_date:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="start_date cannot be after end_date")

filters = [FinancialRecord.is_deleted.is_(False)]
if start_date:
filters.append(FinancialRecord.entry_date >= start_date)
if end_date:
filters.append(FinancialRecord.entry_date <= end_date)
if category:
filters.append(FinancialRecord.category == category)
if search:
search_pattern = f"%{search}%"
filters.append(
or_(
FinancialRecord.category.ilike(search_pattern),
FinancialRecord.notes.ilike(search_pattern),
)
)
if record_type:
filters.append(FinancialRecord.record_type == RecordType(record_type.value))

stmt = (
select(FinancialRecord)
.where(*filters)
.order_by(FinancialRecord.entry_date.desc(), FinancialRecord.id.desc())
)
rows = (await db.execute(stmt)).scalars().all()

csv_content = serialize_financial_records_to_csv(rows)
return Response(
content=csv_content,
media_type="text/csv",
headers={"Content-Disposition": 'attachment; filename="financial_records_export.csv"'},
)


@router.post("/import", response_model=CSVImportResponse)
async def import_financial_records_csv(
request: Request,
response: Response,
db: AsyncSession = Depends(get_db),
redis: Redis = Depends(get_redis),
current_user: User = Depends(get_current_user),
_: None = Depends(sensitive_route_limiter),
) -> CSVImportResponse:
_assert_admin(current_user)
await _purge_expired_deleted_records(db)

content_type = request.headers.get("content-type", "").split(";", 1)[0].strip().lower()
if content_type != "text/csv":
raise HTTPException(status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, detail="Content-Type must be text/csv")

raw_body = await request.body()
if not raw_body:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="CSV body is empty")

try:
csv_text = raw_body.decode("utf-8")
except UnicodeDecodeError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="CSV must be UTF-8 encoded") from exc

try:
parsed_rows = parse_financial_records_csv(csv_text)
except CSVParseError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc

valid_rows, errors = validate_financial_record_csv_rows(parsed_rows)

user_ids = {payload.user_id for _, payload, _ in valid_rows if payload.user_id is not None}
if user_ids:
existing_user_ids = set((await db.execute(select(User.id).where(User.id.in_(user_ids)))).scalars().all())
missing_user_ids = user_ids - existing_user_ids
if missing_user_ids:
retained_rows: list[tuple[int, FinancialRecordCreate, dict[str, str]]] = []
for row_index, payload, row_data in valid_rows:
if payload.user_id in missing_user_ids:
errors.append(
{
"row_index": row_index,
"row_data": row_data,
"errors": ["Target user not found"],
}
)
else:
retained_rows.append((row_index, payload, row_data))
valid_rows = retained_rows

imported_count = 0
if valid_rows:
created_rows = [
FinancialRecord(
amount=payload.amount,
record_type=RecordType(payload.record_type.value),
category=payload.category,
entry_date=payload.entry_date,
notes=payload.notes,
user_id=payload.user_id or current_user.id,
)
for _, payload, _ in valid_rows
]
db.add_all(created_rows)
await db.commit()
imported_count = len(created_rows)
await invalidate_dashboard_cache(redis)

failed_count = len(errors)
if failed_count > 0 and imported_count > 0:
response.status_code = status.HTTP_207_MULTI_STATUS
status_value = "partial_success"
elif failed_count > 0:
response.status_code = status.HTTP_400_BAD_REQUEST
status_value = "failed"
else:
response.status_code = status.HTTP_201_CREATED
status_value = "success"

normalized_errors = [
{
"row_index": int(error["row_index"]),
"row_data": dict(error["row_data"]),
"errors": [str(item) for item in error["errors"]],
}
for error in errors
]

return CSVImportResponse(
status=status_value,
total_rows=len(parsed_rows),
imported_count=imported_count,
failed_count=failed_count,
errors=normalized_errors,
)


@router.get("/{record_id}", response_model=FinancialRecordOut)
async def get_financial_record(
record_id: int,
Expand Down
14 changes: 14 additions & 0 deletions app/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ class DeletedFinancialRecordListResponse(BaseModel):
items: list[DeletedFinancialRecordOut]


class CSVImportRowError(BaseModel):
row_index: int
row_data: dict[str, str]
errors: list[str]


class CSVImportResponse(BaseModel):
status: str
total_rows: int
imported_count: int
failed_count: int
errors: list[CSVImportRowError]


class PaginationParams(BaseModel):
offset: int = Field(default=0, ge=0)
limit: int = Field(default=20, ge=1, le=100)
Expand Down
122 changes: 122 additions & 0 deletions app/services/csv_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import csv
import io
from decimal import Decimal, InvalidOperation

from pydantic import ValidationError

from app.models import FinancialRecord, RecordType
from app.schemas import FinancialRecordCreate

CSV_EXPORT_HEADERS = [
"id",
"amount",
"record_type",
"category",
"entry_date",
"notes",
"created_at",
"updated_at",
"user_id",
]


class CSVParseError(ValueError):
"""Raised when CSV payload cannot be parsed safely."""


def serialize_financial_records_to_csv(records: list[FinancialRecord]) -> str:
output = io.StringIO(newline="")
writer = csv.writer(output)
writer.writerow(CSV_EXPORT_HEADERS)

for row in records:
writer.writerow(
[
row.id,
str(row.amount),
row.record_type.value,
row.category,
row.entry_date.isoformat(),
row.notes or "",
row.created_at.isoformat(),
row.updated_at.isoformat(),
row.user_id,
]
)

return output.getvalue()


def parse_financial_records_csv(text: str) -> list[dict[str, str]]:
normalized = text.lstrip("\ufeff").strip()
if not normalized:
raise CSVParseError("CSV body is empty")

reader = csv.DictReader(io.StringIO(normalized))
if reader.fieldnames is None:
raise CSVParseError("CSV header row is required")

headers = {name.strip() for name in reader.fieldnames if name is not None}
required_headers = {"amount", "record_type", "category", "entry_date"}
allowed_headers = required_headers | {"notes", "user_id"}

missing_headers = required_headers - headers
if missing_headers:
raise CSVParseError(f"Missing required CSV headers: {', '.join(sorted(missing_headers))}")

unknown_headers = headers - allowed_headers
if unknown_headers:
raise CSVParseError(f"Unsupported CSV headers: {', '.join(sorted(unknown_headers))}")

rows: list[dict[str, str]] = []
for raw_row in reader:
row = {
key.strip(): (value.strip() if isinstance(value, str) else "")
for key, value in raw_row.items()
if key is not None
}
if not any(row.values()):
continue
rows.append(row)

if not rows:
raise CSVParseError("CSV contains no data rows")

return rows


def validate_financial_record_csv_rows(
rows: list[dict[str, str]],
) -> tuple[list[tuple[int, FinancialRecordCreate, dict[str, str]]], list[dict[str, object]]]:
valid_rows: list[tuple[int, FinancialRecordCreate, dict[str, str]]] = []
errors: list[dict[str, object]] = []

for row_index, row in enumerate(rows, start=2):
row_errors: list[str] = []

user_id: int | None = None
raw_user_id = row.get("user_id")
if raw_user_id:
try:
user_id = int(raw_user_id)
except ValueError:
row_errors.append("user_id must be an integer when provided")

try:
payload = FinancialRecordCreate(
amount=Decimal(row.get("amount", "")),
record_type=RecordType(row.get("record_type", "")),
category=row.get("category", ""),
entry_date=row.get("entry_date", ""),
notes=row.get("notes") or None,
user_id=user_id,
)
if row_errors:
errors.append({"row_index": row_index, "row_data": row, "errors": row_errors})
continue
valid_rows.append((row_index, payload, row))
except (InvalidOperation, ValidationError, ValueError) as exc:
row_errors.append(str(exc))
errors.append({"row_index": row_index, "row_data": row, "errors": row_errors})

return valid_rows, errors
Loading
Loading