Skip to content

Latest commit

 

History

History
487 lines (371 loc) · 19.1 KB

File metadata and controls

487 lines (371 loc) · 19.1 KB

MemoryBridge Architecture

English | 中文

1. System Overview

MemoryBridge sits between your Agent and your LLM provider as a transparent proxy:

Agent ──(OpenAI API + Token)──→ MemoryBridge ──(HTTP)──→ LLM Provider
                                 │   ↑
                                 │   ├── Retrieve memories (read)
                                 │   └── Store memories (write)
                                 │
                             ┌───┴────┐
                             │  Qdrant  │
                             └───┬────┘
                                 │
                             Mem0 (memory engine)

It does not talk to users directly. It intercepts requests your agent sends to the LLM, injects memory-related context before forwarding, and stores new memories after the LLM responds. Your agent sees a standard /v1/chat/completions endpoint — nothing more, nothing less.


2. Process Model

Host Manager (Python)
  ├── Spawns Qdrant binary subprocess  (localhost:6333)
  └── Spawns MemoryBridge FastAPI subprocess (localhost:8000)
  • One service, one process. Local TCP-only communication.
  • Graceful ordered shutdown on SIGINT/SIGTERM.
  • Infrastructure parameters (ports, paths) come from .env. No API keys in .env.
  • Qdrant is launched with concurrency tuning to prevent deadlocks:
    • QDRANT__STORAGE__OPTIMIZERS__MAX_OPTIMIZATION_THREADS=1 — single-threaded segment optimization eliminates lock contention with concurrent searches
    • QDRANT__STORAGE__PERFORMANCE__MAX_SEARCH_THREADS=4 — caps search thread pool size

3. Design Principles

3.1 Hard Failure, No Fallback

Every error surface uses strict, non-recoverable semantics:

Scenario Behavior
Token missing 401 — no default
Token config field null/empty/zero TokenConfigError — token invalidated, returns 401
JSON parse failure (memory_llm_body / embed_body) TokenConfigError — token invalidated
Template file missing TemplateNotFoundError — process exits at startup
Memory retrieval fails 502 — distinct from upstream-passed 500
Message validation fails 422
LLM provider unreachable 502 — no retry, no fallback

There is no degradation, no guessing, no fallback path.

3.2 Per-Request Instance Isolation

  • A new ChatRequestHandler instance is created for every HTTP request.
  • Handler state is fully immutable after construction.
  • MemoryManager is passed as a parameter, never stored as a mutable attribute.
  • TokenConfig is a frozen dataclass.

3.3 Transparent Proxy

  • Only the messages field is intercepted for memory and session injection.
  • memory_llm_body / embed_body are transparent JSON — the system never parses their internals.
  • embed_provider is set per token at registration; supports any embedding vendor.
  • LLM responses are passed through untouched (status code + headers + body).
  • All non-messages fields in the request body are forwarded as-is.

3.4 Full Type Coverage

  • mypy strict mode enforced.
  • All public functions have explicit type annotations.
  • dict[str, object] is limited to Mem0 boundary files only.

4. Token-Driven Multi-Tenancy

Each Service Token is an independent isolation boundary:

  • Token = config + memories + sessions. A token carries its full LLM/Embedding configuration, memory prompt, and behavioral parameters.
  • Admin Token is used to register/manage Service Tokens (created at --init).
  • Configuration is externalized: LLM keys and endpoints are stored in SQLite via the Admin API, never in .env.

Token Config Hard Failure

All token config fields are validated at load time:

Field type Hard failure condition
String (required URLs/Keys) null or empty string → TokenConfigError
Integer (embed_dims, memory_limit, etc.) null or 0 → TokenConfigError
JSON body (memory_llm_body, embed_body) not valid JSON / not an object → TokenConfigError

A corrupted token returns None from get_config() / get_detail() (effectively invalid, returns 401).


5. Request Lifecycle

Incoming request
  │
  ▼
TokenAuthMiddleware
  ├── Generate request-id (uuid4.hex[:8]), inject into LogRecord
  ├── /health → pass through (no auth)
  ├── /v1/admin/* → require is_admin=1
  └── /v1/chat/completions → load token config + read raw JSON body
       └── Attach to request.state: token_config, raw_body, request_id
  │
  ▼
router.py — parse messages from raw_body["messages"] → list[Message]
  │
  ▼
ChatRequestHandlerFactory.create_handler(body, messages, background_tasks, token_config)
  │
  ▼
ChatRequestHandler.execute()
  │
  ├── normalize_messages() → validate messages (tool_call_id required when role=tool, etc.)
  │
  ├── SessionStore.get(token_id, window_size)
  │     └── Read latest N messages from SQLite messages table
  │
  ├── MemoryManagerPool.get(token_id, token_config, settings)
  │     └── Lazily instantiate Mem0 per token; first access creates, config hash change rebuilds
  │     └── MemoryManager.search(query, user_id=token, limit)
  │         └── Mem0 → Qdrant vector search
  │
  ├── ContextBuilder.build(messages, memories)
  │     └── Inject memories into system prompt (using external template file)
  │
  ├── _inject_history(enriched_messages, session_history)
  │     └── Insert conversation history after the system message
  │
  ├── build_forward_payload(original_body, enriched_messages)
  │     └── Replace messages in original body with enriched version; keep all other fields
  │
  ├── response_sender.send_non_stream(...)  or  response_sender.send_stream(...)
  │     └── Non-streaming: response status/headers/body passed through to client
  │     └── Streaming: raw SSE bytes passed through; chunks parsed internally for content + tool_calls collection
  │
  └── _store_memory() (BackgroundTask / after stream ends)
        ├── SessionStore.append(token_id, messages)
        └── MemoryManager.add(messages, user_id=token)

Pass-through principle: The system only intercepts and processes the messages field (for memory/session injection). All other request fields (model, temperature, tools, response_format, etc.) are forwarded to the LLM provider unchanged. Responses are likewise passed through unchanged (status code + headers + body).


6. Memory Subsystem

Mem0 Configuration

Each Service Token carries an independent Mem0 configuration:

  • memory_llm_base_url / memory_llm_api_key / memory_llm_provider: memory extraction LLM
  • memory_llm_body: transparent JSON (model, temperature, etc.) — system does not parse internals
  • embed_base_url / embed_api_key / embed_dims / embed_provider: vectorization service
  • embed_body: transparent JSON for embedder config
  • memory_prompt: custom memory extraction prompt
  • memory_limit: number of memories per retrieval

Timeout:

  • memory_search_timeout (default 30s) — if Qdrant or the embedding HTTP client blocks during search, the operation fails fast rather than hanging indefinitely.
  • memory_store_timeout (default 120s) — protects background memory store operations. Unlike search, store involves an LLM call for fact extraction, so a longer timeout is warranted. Prevents a hung Mem0 background task from blocking a worker thread indefinitely.

Memory Isolation

Memories are isolated by user_id = token (the token hex string). Each token owns its own memory namespace.

MemoryManagerPool

Caches MemoryManager instances keyed by token_id + config hash:

  • First access creates a new instance (Memory.from_config())
  • Config hash change triggers automatic rebuild
  • invalidate(token_id) forces rebuild on next access (used after token update)
  • close_all() is called during application shutdown

7. Session Management

SessionStore

SQLite-backed message persistence, keyed by token_id:

CREATE TABLE messages (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    token_id   INTEGER NOT NULL,
    role       TEXT NOT NULL,
    content    TEXT NOT NULL,
    metadata   TEXT,
    created_at TEXT DEFAULT (datetime('now')),
    hash       TEXT NOT NULL DEFAULT ''
);
CREATE INDEX idx_messages_token ON messages(token_id, id);
CREATE INDEX idx_messages_hash ON messages(token_id, hash);
  • One token shares one conversation history stream.
  • Read returns the latest N messages based on session_window_size from token config.
  • System messages are not stored (ContextBuilder dynamically builds them per request).
  • Only user/assistant/tool roles are persisted.
  • LLM response metadata preserved: tool_calls from LLM non-streaming and streaming responses are parsed and stored in the message metadata column, enabling correct restoration of multi-turn tool-call chains from session history.
  • reasoning_content excluded from storage: the system strips thinking content from LLM responses before session persistence; it is only forwarded transparently to the client.
  • Idempotent dedup: each message is hashed via MD5 (role + content + metadata). Before writing, the latest 200 messages for the same token are checked — duplicates are skipped.

8. Context Injection

Unified Message Model

The entire system uses the Message Pydantic model as the sole message representation:

Message: role, content, reasoning_content?, tool_calls?, tool_call_id?, name?

Boundary conversions to/from dict happen only in two places:

  • msg_to_dict() in api/forward.py (for building the forward payload)
  • MemoryManager.add() in core/memory.py (for Mem0 storage)

Memory Template (External File)

Memory injection uses an external template file:

  • templates/memory_template.md — default template (English), auto-created by --init
  • templates/memory_template_zh.md — Chinese template
  • Loaded at startup from the MEMORY_TEMPLATE_PATH config value
  • File missing → TemplateNotFoundError, process exits (hard failure)

The template must contain a {memories} placeholder, replaced at runtime with the retrieved memory list.

Assembly Order

[system: original system prompt + memory block]
[user/assistant: conversation history (messages 1...N)]
[user/assistant: current request messages ...]

9. Forwarding Layer

providers/http_client.py — generic synchronous HTTP client:

  • chat(base_url, api_key, payload, timeout) → non-streaming, returns httpx.Response
  • chat_stream_request(base_url, api_key, payload, timeout) → streaming, returns (httpx.Client, httpx.Response)

Pass-through design:

  • Request: build_forward_payload() replaces only messages; all other fields preserved.
  • Response: upstream status code, headers, and body are passed through unchanged.
  • Errors: upstream HTTP errors (400, 429, 500, etc.) are passed through; only network connection failures return 502; memory retrieval failures return 502.
  • Streaming: upstream SSE bytes are passed through; chunks are parsed internally to collect the full response text and tool call metadata for session storage after the stream ends.

Zero vendor-specific code. base_url is the API base URL from token config; /v1/chat/completions is automatically appended by http_client.chat().


10. Error Handling

Scenario Status Strategy
Token missing / invalid 401 Reject
Token config field null/zero 401 Reject (TokenConfigError → token invalidated)
Token config JSON parse failure 401 Reject (hard failure, no fallback to empty dict)
Service token used on admin endpoint 403 Reject
Admin token targeting non-existent token 404 Reject
messages empty or missing 422 Reject
Message validation failure 422 Reject
Template file missing Startup fail TemplateNotFoundError → process exit
Memory retrieval failure 502 Reject
LLM provider unreachable 502 Reject
LLM provider HTTP error (400/429/500, etc.) Pass through Original status + body
Memory store failure Non-blocking Log only

11. Logging

Log Levels

Level Purpose
DEBUG Detailed operations: DB I/O, HTTP request/response, memory operations
INFO Key events: request received, token registered, forward complete
WARN Unexpected but non-breaking
ERROR Critical errors requiring immediate attention

Request-ID Tracking

  • Every request gets a request-id (uuid4.hex[:8]) generated in TokenAuthMiddleware.
  • RequestIdFilter injects the request-id into every LogRecord, regardless of log format.
  • Log format: [time] [level] [rid=xxx] [module] message

12. Configuration

.env (Infrastructure Only, Committed to Git)

The .env template (auto-generated by --init) contains only infrastructure parameters:

LOG_LEVEL=INFO
MEMORY_BRIDGE_HOST=localhost
MEMORY_BRIDGE_PORT=8000
QDRANT_HOST=localhost
QDRANT_PORT=6333
MEM0_COLLECTION_NAME=memory_bridge
MEM0_HISTORY_DB_PATH=./data/mem0_history.db
SESSION_DB_PATH=data/sessions.db
TOKEN_DB_PATH=data/tokens.db
HTTP_TIMEOUT=120
MEMORY_SEARCH_TIMEOUT=30
MEMORY_STORE_TIMEOUT=120
MEMORY_TEMPLATE_PATH=templates/memory_template.md

No API keys. LLM/Embedding configuration is registered via the Admin API and stored in SQLite.

The .env file itself serves as the template — it contains no secrets and is safe to commit. No separate .env.example is needed.


13. Directory Structure

src/memory_bridge/
├── main.py                  # FastAPI app factory + lifespan
├── host_manager.py          # Host process manager
├── host_process.py          # Subprocess lifecycle
├── host_init.py             # First-time initialization
├── config.py                # Infrastructure config (pydantic-settings)
├── exceptions.py            # Custom exceptions
│
├── api/
│   ├── router.py            # /health, /v1/chat/completions
│   ├── admin_router.py      # /v1/admin/tokens CRUD
│   ├── middleware.py         # TokenAuthMiddleware (request-id + auth)
│   ├── chat_handler.py      # ChatRequestHandler (per-request orchestrator)
│   ├── handler_factory.py   # ChatRequestHandlerFactory
│   ├── response_sender.py   # send_non_stream / send_stream (LLM dispatch)
│   └── forward.py           # Message conversion + payload building + response helpers
│
├── core/
│   ├── memory.py            # MemoryManager + MemoryManagerPool
│   ├── session.py           # SessionStore
│   ├── session_database.py  # Session SQLite I/O
│   ├── context.py           # ContextBuilder (memory injection + templates)
│   ├── tokens.py            # TokenStore (CRUD)
│   ├── token_database.py    # Token SQLite I/O
│   ├── token_config.py      # TokenConfig dataclass (frozen)
│   ├── normalizer.py        # Message normalization
│   └── logging.py           # Logging config + request-id filter
│
├── providers/
│   └── http_client.py       # Generic HTTP client (transparent pass-through)
│
└── models/
    ├── admin_models.py      # TokenRegisterRequest / TokenUpdateRequest / TokenDetail
    └── request.py           # Message (sole message model)

templates/
    ├── memory_template.md       # Memory injection template (English)
    └── memory_template_zh.md    # Memory injection template (Chinese)

14. Key Design Decisions

Token Auth Hardening

Token authentication is mandatory, with no grace period. The service does not check for token existence at startup, but all API endpoints enforce authentication.

Embedder Provider Externalized

embed_provider is set per token at registration, stored in a dedicated database column, and used directly by build_mem0_config(). No hardcoded "openai" provider — supports any embedding vendor without code changes.

Template File Required

The memory template is loaded at startup from MEMORY_TEMPLATE_PATH. If the file is missing, TemplateNotFoundError is raised and the process exits. No fallback to a built-in default.

Vendor Independence

No vendor-specific logic exists in the codebase. All LLM interactions go through a generic HTTP client; base_url, api_key, and model all come from token config.

Test Coverage Gate

--cov-fail-under=95 enforces a minimum coverage threshold.

Request/Response Pass-Through

MemoryBridge intercepts only the messages field for memory and session injection. All other request fields and the entire LLM response are passed through unchanged.

Upstream Error Code Pass-Through

Any HTTP status code returned by the LLM provider (4xx/5xx) is passed through to the client unchanged. MemoryBridge returns its own errors only for network connectivity failures (502) and memory retrieval failures (502).

Qdrant Deadlock Prevention

Qdrant's internal segment optimization can deadlock with concurrent search operations when using on_disk: True storage mode. To prevent this:

  • QDRANT__STORAGE__OPTIMIZERS__MAX_OPTIMIZATION_THREADS=1 — eliminates lock contention by serializing segment merges
  • QDRANT__STORAGE__PERFORMANCE__MAX_SEARCH_THREADS=4 — caps concurrent search threads
  • memory_search_timeout (30s) — defense-in-depth: if Qdrant hangs despite tuning, memory search fails fast instead of blocking the request indefinitely
  • memory_store_timeout (120s) — defense-in-depth for background memory store: if Mem0's internal LLM call hangs during fact extraction, the background task fails rather than blocking a worker thread indefinitely

SessionStore Idempotent Writes

Each message is hashed via MD5 (role + content + metadata). Before writing, the latest 200 messages for the same token are checked — duplicates are skipped.

ChatRequestHandler Immutable State

ChatRequestHandler does not hold MemoryManager as a mutable instance attribute. MemoryManager is obtained in _prepare_context(), used ephemerally to create a store callback, and passed to response_sender functions as a closure parameter. The Handler's instance state is fully immutable after construction.

Request-ID Global Injection

RequestIdFilter (a logging.Filter) automatically injects request_id from a ContextVar into every LogRecord. All log calls from any module automatically include [rid=xxx] without explicit structured-logging calls.

Init Interruption Sentinel

During --init, the sentinel file .qdrant-initializing is created before the database initialization step and renamed to .qdrant-initialized on success. If the process is interrupted mid-init, the next python memorybridge.pyz start will detect .qdrant-initializing and report that initialization was interrupted, requiring a re-run of --init. This prevents silent corruption from partial initialization.