Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions hindsight-api-slim/hindsight_api/api/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ class RecallRequest(BaseModel):
default=None,
description="List of fact types to recall: 'world', 'experience', 'observation'. Defaults to world and experience if not specified.",
)
prefer_observations: bool = Field(
default=True,
description=(
"When recalling raw facts ('world'/'experience') together with 'observation', drop any raw "
"fact that an observation in the results was consolidated from, so the observation supersedes "
"it and you don't get duplicate content. The freed slots are backfilled with the next results, "
"keeping the result count at the requested budget. Enabled by default; set to false to return "
"raw facts even when an observation already covers them. No effect unless 'observation' and at "
"least one raw type are both requested."
),
)
budget: Budget = Budget.MID
max_tokens: int = 4096
trace: bool = False
Expand Down Expand Up @@ -3826,6 +3837,7 @@ async def api_recall(
max_tokens=request.max_tokens,
enable_trace=request.trace,
fact_type=fact_types,
prefer_observations=request.prefer_observations,
question_date=question_date,
include_entities=include_entities,
max_entity_tokens=max_entity_tokens,
Expand Down
53 changes: 53 additions & 0 deletions hindsight-api-slim/hindsight_api/engine/memory_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -3843,6 +3843,11 @@ async def recall_async(
max_tokens: int = 4096,
enable_trace: bool = False,
fact_type: list[str] | None = None,
# Defaults False at the engine layer on purpose: the user-facing default is True
# (set on the HTTP RecallRequest / MCP recall tool), but internal callers that
# recall raw facts on purpose — notably consolidation, which needs the raw facts
# it folds into observations — must NOT have them silently deduped away.
prefer_observations: bool = False,
question_date: datetime | None = None,
include_entities: bool = False,
max_entity_tokens: int = 500,
Expand Down Expand Up @@ -3875,6 +3880,10 @@ async def recall_async(
bank_id: bank ID to recall for
query: Recall query
fact_type: List of fact types to recall (e.g., ['world', 'experience'])
prefer_observations: When True and both 'observation' and a raw type ('world'/'experience')
are requested, drop raw facts that a returned observation was consolidated from
(deduplication by provenance). Freed slots backfill, keeping the result count at
the budget. No-op unless both observation and raw types are requested.
budget: Budget level for graph traversal (low=100, mid=300, high=600 units)
max_tokens: Maximum tokens to return (counts only 'text' field, default 4096)
Results are returned until token budget is reached, stopping before
Expand Down Expand Up @@ -4012,6 +4021,7 @@ async def recall_async(
max_chunk_tokens,
request_context,
semaphore_wait=semaphore_wait,
prefer_observations=prefer_observations,
tags=tags,
tags_match=tags_match,
tag_groups=tag_groups,
Expand Down Expand Up @@ -4148,6 +4158,7 @@ async def _search_with_retries(
max_chunk_tokens: int = 8192,
request_context: "RequestContext" = None,
semaphore_wait: float = 0.0,
prefer_observations: bool = False,
tags: list[str] | None = None,
tags_match: TagsMatch = "any",
tag_groups: list[TagGroup] | None = None,
Expand Down Expand Up @@ -4660,6 +4671,48 @@ def to_tuple_format(results):
if request_context is not None:
request_context.raise_if_cancelled()

# Step 4.8: prefer-observations dedup. When the caller asked for observations
# alongside raw facts, an observation supersedes the raw facts it was
# consolidated from: drop those raw facts so the same content isn't returned
# twice. Runs BEFORE the Step 5 truncation so the freed slots backfill with
# the next-best results, keeping the result count at the budget. No-op unless
# 'observation' and at least one raw type were both requested.
raw_types_requested = {"world", "experience"} & set(fact_type)
if prefer_observations and "observation" in fact_type and raw_types_requested:
# "The observation list" = observations within the window we would return.
# Only those can supersede a raw fact; a far-down observation should not
# suppress a top raw fact it merely happens to reference.
observation_ids = [
uuid.UUID(sr.id)
for sr in scored_results[: thinking_budget * 2]
if sr.retrieval.fact_type == "observation"
]
if observation_ids:
superseded_ids: set[str] = set()
async with acquire_with_retry(backend) as dedup_conn:
obs_rows = await dedup_conn.fetch(
f"""
SELECT source_memory_ids
FROM {fq_table("memory_units")}
WHERE id = ANY($1::uuid[]) AND fact_type = 'observation'
""",
observation_ids,
)
for obs_row in obs_rows:
for sid in obs_row["source_memory_ids"] or []:
superseded_ids.add(str(sid))
if superseded_ids:
before_count = len(scored_results)
scored_results = [
sr
for sr in scored_results
if not (sr.retrieval.fact_type in ("world", "experience") and sr.id in superseded_ids)
]
log_buffer.append(
f" [4.8] prefer_observations: dropped {before_count - len(scored_results)} "
f"raw fact(s) superseded by {len(observation_ids)} observation(s)"
)

# Step 5: Truncate to thinking_budget * 2 for token filtering
rerank_limit = thinking_budget * 2
top_scored = scored_results[:rerank_limit]
Expand Down
12 changes: 12 additions & 0 deletions hindsight-api-slim/hindsight_api/mcp_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,7 @@ async def recall(
max_tokens: int = 4096,
budget: str = "high",
types: list[str] | None = None,
prefer_observations: bool = True,
tags: list[str] | None = None,
tags_match: str = "any",
tag_groups: list[dict] | None = None,
Expand All @@ -845,6 +846,10 @@ async def recall(
max_tokens: Maximum tokens to return in results (default: 4096)
budget: Search budget - 'low', 'mid', or 'high' (default: 'high'). Higher budgets search more thoroughly.
types: Fact types to include (e.g., ['world', 'experience']). Default: all types.
prefer_observations: When recalling raw facts together with 'observation', drop any raw fact
that a returned observation was consolidated from, so the observation supersedes it (no
duplicate content). Enabled by default; set false to keep raw facts an observation already
covers. No effect unless 'observation' and a raw type are both in types. Default: True.
tags: Optional tags to filter results by (e.g., ['project:alpha']). Mutually exclusive with tag_groups.
tags_match: How to match tags - 'any' (match any tag) or 'all' (match all tags). Default: 'any'
tag_groups: Compound tag filter using boolean groups (AND-ed together). Each group is a leaf
Expand Down Expand Up @@ -873,6 +878,7 @@ async def recall(
"bank_id": target_bank,
"query": query,
"fact_type": fact_types,
"prefer_observations": prefer_observations,
"budget": budget_enum,
"max_tokens": max_tokens,
"request_context": _get_request_context(config),
Expand Down Expand Up @@ -905,6 +911,7 @@ async def recall(
max_tokens: int = 4096,
budget: str = "high",
types: list[str] | None = None,
prefer_observations: bool = True,
tags: list[str] | None = None,
tags_match: str = "any",
tag_groups: list[dict] | None = None,
Expand All @@ -916,6 +923,10 @@ async def recall(
max_tokens: Maximum tokens to return in results (default: 4096)
budget: Search budget - 'low', 'mid', or 'high' (default: 'high'). Higher budgets search more thoroughly.
types: Fact types to include (e.g., ['world', 'experience']). Default: all types.
prefer_observations: When recalling raw facts together with 'observation', drop any raw fact
that a returned observation was consolidated from, so the observation supersedes it (no
duplicate content). Enabled by default; set false to keep raw facts an observation already
covers. No effect unless 'observation' and a raw type are both in types. Default: True.
tags: Optional tags to filter results by (e.g., ['project:alpha']). Mutually exclusive with tag_groups.
tags_match: How to match tags - 'any' (match any tag) or 'all' (match all tags). Default: 'any'
tag_groups: Compound tag filter using boolean groups (AND-ed together). Each group is a leaf
Expand Down Expand Up @@ -943,6 +954,7 @@ async def recall(
"bank_id": target_bank,
"query": query,
"fact_type": fact_types,
"prefer_observations": prefer_observations,
"budget": budget_enum,
"max_tokens": max_tokens,
"request_context": _get_request_context(config),
Expand Down
185 changes: 185 additions & 0 deletions hindsight-api-slim/tests/test_recall_prefer_observations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""Tests for the recall `prefer_observations` deduplication flag.

When the caller recalls raw facts ('world'/'experience') together with
'observation' and sets prefer_observations=True, any raw fact that a returned
observation was consolidated from (tracked via memory_units.source_memory_ids)
is dropped so the observation supersedes it — no duplicate content.

Dedup is provenance-based, not semantic: a raw fact that is semantically
similar to an observation but NOT listed in its source_memory_ids must survive.

No LLM required — inserts memory_units directly via SQL with real embeddings.
"""

import uuid

import pytest
import pytest_asyncio

from hindsight_api import MemoryEngine, RequestContext
from hindsight_api.engine.retain import embedding_utils

RC = RequestContext(tenant_id="default")

QUERY = "Alice mountain hiking"

# Two raw facts the observation is consolidated from (must be dropped when the
# flag is on), one raw fact that is semantically similar but NOT a source (must
# survive), and the observation itself.
SRC1_TEXT = "Alice loves hiking in the mountains"
SRC2_TEXT = "Alice hikes the Alps every summer"
NON_SRC_TEXT = "Alice enjoys exploring mountain hiking trails"
OBS_TEXT = "Alice is an avid mountain hiker"


async def _insert_unit(
conn,
*,
unit_id: str,
text: str,
bank_id: str,
embedding_str: str,
fact_type: str = "world",
source_memory_ids: list[uuid.UUID] | None = None,
) -> None:
await conn.execute(
"""
INSERT INTO memory_units (id, bank_id, text, fact_type, embedding, source_memory_ids)
VALUES ($1, $2, $3, $4, $5::vector, $6::uuid[])
""",
unit_id,
bank_id,
text,
fact_type,
embedding_str,
source_memory_ids,
)


def _to_str(emb: list[float]) -> str:
return "[" + ",".join(str(v) for v in emb) + "]"


def _result_ids(result) -> set[str]:
return {str(r.id) for r in result.results}


@pytest_asyncio.fixture
async def seeded_obs_memory(memory_no_llm_verify: MemoryEngine):
"""Seed two source facts, one non-source fact, and an observation over the two sources."""
engine = memory_no_llm_verify
bank_id = f"test-prefer-obs-{uuid.uuid4().hex[:8]}"
await engine.get_bank_profile(bank_id, request_context=RC)

src1_id = str(uuid.uuid4())
src2_id = str(uuid.uuid4())
non_src_id = str(uuid.uuid4())
obs_id = str(uuid.uuid4())

embeddings = await embedding_utils.generate_embeddings_batch(
engine.embeddings,
[SRC1_TEXT, SRC2_TEXT, NON_SRC_TEXT, OBS_TEXT],
)

pool = await engine._get_pool()
async with pool.acquire() as conn:
await _insert_unit(conn, unit_id=src1_id, text=SRC1_TEXT, bank_id=bank_id, embedding_str=_to_str(embeddings[0]))
await _insert_unit(conn, unit_id=src2_id, text=SRC2_TEXT, bank_id=bank_id, embedding_str=_to_str(embeddings[1]))
await _insert_unit(
conn, unit_id=non_src_id, text=NON_SRC_TEXT, bank_id=bank_id, embedding_str=_to_str(embeddings[2])
)
await _insert_unit(
conn,
unit_id=obs_id,
text=OBS_TEXT,
bank_id=bank_id,
embedding_str=_to_str(embeddings[3]),
fact_type="observation",
source_memory_ids=[uuid.UUID(src1_id), uuid.UUID(src2_id)],
)

ids = {"src1": src1_id, "src2": src2_id, "non_src": non_src_id, "obs": obs_id}
yield engine, bank_id, ids

await engine.delete_bank(bank_id, request_context=RC)


class TestPreferObservations:
async def test_disabled_returns_sources_and_observation(self, seeded_obs_memory):
"""Without the flag, the source facts AND the observation are all returned."""
engine, bank_id, ids = seeded_obs_memory
result = await engine.recall_async(
bank_id=bank_id,
query=QUERY,
request_context=RC,
fact_type=["world", "experience", "observation"],
prefer_observations=False,
max_tokens=10000,
)
found = _result_ids(result)
assert ids["src1"] in found
assert ids["src2"] in found
assert ids["obs"] in found

async def test_enabled_drops_source_facts_keeps_observation(self, seeded_obs_memory):
"""With the flag, the observation supersedes the facts it was consolidated from."""
engine, bank_id, ids = seeded_obs_memory
result = await engine.recall_async(
bank_id=bank_id,
query=QUERY,
request_context=RC,
fact_type=["world", "experience", "observation"],
prefer_observations=True,
max_tokens=10000,
)
found = _result_ids(result)
assert ids["obs"] in found, "the observation must remain"
assert ids["src1"] not in found, "source fact 1 is superseded by the observation"
assert ids["src2"] not in found, "source fact 2 is superseded by the observation"

async def test_enabled_keeps_non_source_fact(self, seeded_obs_memory):
"""Dedup is provenance-based: a similar fact NOT in source_memory_ids survives."""
engine, bank_id, ids = seeded_obs_memory
result = await engine.recall_async(
bank_id=bank_id,
query=QUERY,
request_context=RC,
fact_type=["world", "experience", "observation"],
prefer_observations=True,
max_tokens=10000,
)
found = _result_ids(result)
assert ids["non_src"] in found, "a non-source fact must not be dropped, even if semantically similar"

async def test_noop_without_observation_type(self, seeded_obs_memory):
"""The flag is a no-op when 'observation' is not among the requested types."""
engine, bank_id, ids = seeded_obs_memory
result = await engine.recall_async(
bank_id=bank_id,
query=QUERY,
request_context=RC,
fact_type=["world", "experience"],
prefer_observations=True,
max_tokens=10000,
)
found = _result_ids(result)
assert ids["src1"] in found
assert ids["src2"] in found


def test_default_split_user_facing_on_engine_off():
"""The user-facing default is on; the engine default is off.

The HTTP RecallRequest (and MCP recall tool) default prefer_observations to
True so callers get dedup automatically. The engine method defaults it to
False on purpose so internal callers — notably consolidation, which needs the
raw facts it folds into observations — are never silently deduped.
"""
import inspect

from hindsight_api.api.http import RecallRequest
from hindsight_api.engine.memory_engine import MemoryEngine

assert RecallRequest(query="anything").prefer_observations is True
engine_default = inspect.signature(MemoryEngine.recall_async).parameters["prefer_observations"].default
assert engine_default is False
12 changes: 12 additions & 0 deletions hindsight-clients/go/api/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7113,6 +7113,18 @@ components:
type: string
nullable: true
type: array
prefer_observations:
default: true
description: "When recalling raw facts ('world'/'experience') together with\
\ 'observation', drop any raw fact that an observation in the results\
\ was consolidated from, so the observation supersedes it and you don't\
\ get duplicate content. The freed slots are backfilled with the next\
\ results, keeping the result count at the requested budget. Enabled by\
\ default; set to false to return raw facts even when an observation already\
\ covers them. No effect unless 'observation' and at least one raw type\
\ are both requested."
title: Prefer Observations
type: boolean
budget:
$ref: '#/components/schemas/Budget'
max_tokens:
Expand Down
Loading
Loading