Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions nextcloud_mcp_server/vector/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from nextcloud_mcp_server.vector.sharing_state import (
claim_existing_index,
existing_principals,
file_title_from_path,
release_document_for_user,
)

Expand Down Expand Up @@ -531,7 +532,11 @@ async def _index_document(
# in the tenant, claim it for this user (observed-access ACL) and skip
# the expensive fetch/parse/embed entirely.
if doc_task.etag and await claim_existing_index(
doc_task.doc_id, "file", doc_task.etag, doc_task.user_id
doc_task.doc_id,
"file",
doc_task.etag,
doc_task.user_id,
current_path=doc_task.file_path,
):
await delete_placeholder_point(
doc_id=doc_task.doc_id,
Expand Down Expand Up @@ -615,7 +620,10 @@ async def _index_document(

content = result.text
file_metadata = result.metadata
title = file_metadata.get("title") or file_path.split("/")[-1]
# Favour the Nextcloud filename over any embedded document title
# (e.g. a PDF's /Title), which often disagrees with how the user
# named the file and is confusing in the UI.
title = file_title_from_path(file_path)
# etag comes from the scanner's tag REPORT (threaded via the
# DocumentTask); read_file itself returns no etag. It is the
# tenant-wide content-dedup key, so it must be persisted.
Expand Down
37 changes: 35 additions & 2 deletions nextcloud_mcp_server/vector/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@
)
from nextcloud_mcp_server.vector.qdrant_client import get_qdrant_client
from nextcloud_mcp_server.vector.queue.ports import TaskProducer
from nextcloud_mcp_server.vector.sharing_state import claim_existing_index
from nextcloud_mcp_server.vector.sharing_state import (
claim_existing_index,
reconcile_document_path,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -466,7 +469,9 @@ async def scan_user_documents(
# it. Eliminates the per-user reprocessing ping-pong that arises
# because chunk point IDs are user-agnostic (note 386945 #5).
etag = str(file_info.get("etag") or "")
if etag and await claim_existing_index(file_id, "file", etag, user_id):
if etag and await claim_existing_index(
file_id, "file", etag, user_id, current_path=file_path
):
_potentially_deleted.pop((user_id, file_id), None)
logger.debug(
"Dedup: file %s (ID: %s) already indexed in tenant; "
Expand Down Expand Up @@ -582,6 +587,34 @@ async def scan_user_documents(
)
)
file_queued += 1
elif existing_metadata is not None and not existing_metadata.get(
"is_placeholder", False
):
# Reached only on the rename-with-stable-mtime path: a
# fresh modified_at would have set needs_indexing, and an
# etag dedup hit would have continued above -- so here the
# content wasn't re-queued (modified_at stable) yet the
# stored path may be stale from a rename/move (the fileid
# is unchanged). Refresh path/title without re-embedding;
# reconcile_document_path no-ops when the path matches.
# Skip placeholders: reconcile only touches real chunks, so
# a not-yet-indexed file would just incur a 0-point
# set_payload (the real index writes the current path).
try:
await reconcile_document_path(
file_id,
"file",
existing_metadata.get("file_path"),
file_path,
)
except Exception as exc: # noqa: BLE001 — non-fatal
logger.warning(
"Path reconcile failed for file %s (ID: %s) (%s); "
"next scan retries",
file_path,
file_id,
exc,
)

logger.info(
"[SCAN-%s] Found %s tagged PDFs for %s", scan_id, file_count, user_id
Expand Down
73 changes: 73 additions & 0 deletions nextcloud_mcp_server/vector/sharing_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@ def user_principal(user_id: str) -> str:
return f"user:{user_id}"


def file_title_from_path(file_path: str) -> str:
"""Human-facing title for an indexed file: its Nextcloud filename.

We deliberately favour the filename over any embedded document title (e.g. a
PDF's ``/Title`` metadata), which frequently disagrees with how the user
named the file in Nextcloud and is confusing in the search/viz UI.
"""
return file_path.rstrip("/").rsplit("/", 1)[-1] or file_path


def _document_filter(doc_id: str, doc_type: str, *, real_only: bool) -> Filter:
"""Match every chunk of one document; optionally exclude placeholder points."""
must: list = [
Expand Down Expand Up @@ -163,11 +173,56 @@ async def add_principal(
return True


async def reconcile_document_path(
doc_id: str,
doc_type: str,
stored_path: str | None,
current_path: str,
) -> bool:
"""Refresh ``file_path``/``title`` on a renamed/moved file's existing points.

A rename in Nextcloud keeps the ``fileid`` (our ``doc_id``) but changes the
path while leaving content — hence ``etag`` and ``mtime`` — untouched, so
both the dedup claim and the scanner's freshness gate skip re-embedding and
the stored payload keeps the OLD path and OLD filename-derived title. This
rewrites ``file_path`` and the derived ``title`` on every real chunk via a
single metadata-only ``set_payload`` (no re-fetch, no re-embed).

Returns False (no write attempted) only when the path is unchanged or empty.
When the path differs it returns True after issuing the ``set_payload``; that
write is itself a Qdrant-side no-op if no real chunks exist yet (e.g. only a
placeholder), which the callers tolerate. A legacy point with no stored
``file_path`` is treated as changed, backfilling both fields.
"""
if not current_path or stored_path == current_path:
return False
qdrant_client = await get_qdrant_client()
settings = get_settings()
await qdrant_client.set_payload(
collection_name=settings.get_collection_name(),
payload={
"file_path": current_path,
"title": file_title_from_path(current_path),
},
points=_document_filter(doc_id, doc_type, real_only=True),
wait=True,
)
logger.info(
"Reconciled path for %s_%s after rename/move: %r -> %r",
doc_type,
doc_id,
stored_path,
current_path,
)
return True


async def claim_existing_index(
doc_id: str,
doc_type: str,
etag: str,
user_id: str,
current_path: str | None = None,
) -> bool:
"""Tenant-wide dedup claim: skip reprocessing if content is already indexed.

Expand All @@ -177,6 +232,12 @@ async def claim_existing_index(
searchable for them) and the caller should skip fetch/parse/embed. Returns
False when nothing reusable exists and the document must be processed.

When ``current_path`` is given (files), a dedup hit also reconciles a stale
``file_path``/``title`` on the existing points: identical content (etag) at a
new path means the file was renamed/moved, which the dedup would otherwise
silently skip. Reuses the payload already fetched here, so it adds no extra
Qdrant round-trip in the steady (unchanged-path) state.

Fail-safe: a Qdrant error during the lookup degrades to False (process the
document normally) rather than aborting the scan — the dedup is an
optimisation, never a correctness gate. A failure to record the principal
Expand All @@ -198,6 +259,18 @@ async def claim_existing_index(
return False
if existing is None:
return False
if current_path:
try:
await reconcile_document_path(
doc_id, doc_type, existing.get("file_path"), current_path
)
except Exception as exc: # noqa: BLE001 — non-fatal; retried next scan
logger.warning(
"Path reconcile failed for %s_%s (%s); next scan retries",
doc_type,
doc_id,
exc,
)
try:
await add_principal(doc_id, doc_type, user_id, existing.get(ACL_PRINCIPALS_KEY))
except Exception as exc: # noqa: BLE001 — non-fatal; recovered on next scan
Expand Down
115 changes: 115 additions & 0 deletions tests/unit/vector/test_sharing_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,54 @@ async def test_handles_none_current_principals(self, client) -> None:
assert kwargs["payload"][ss.ACL_PRINCIPALS_KEY] == ["user:alice"]


class TestFileTitleFromPath:
def test_uses_basename(self) -> None:
assert ss.file_title_from_path("/Documents/report.pdf") == "report.pdf"

def test_no_directory(self) -> None:
assert ss.file_title_from_path("report.pdf") == "report.pdf"

def test_trailing_slash_ignored(self) -> None:
assert ss.file_title_from_path("/a/b/c/") == "c"

def test_root_only_falls_back_to_input(self) -> None:
# Degenerate path with no basename — return the input rather than "".
assert ss.file_title_from_path("/") == "/"


class TestReconcileDocumentPath:
async def test_noop_when_path_unchanged(self, client) -> None:
changed = await ss.reconcile_document_path(
"42", "file", "/a/old.pdf", "/a/old.pdf"
)
assert changed is False
client.set_payload.assert_not_called()

async def test_noop_when_current_path_empty(self, client) -> None:
changed = await ss.reconcile_document_path("42", "file", "/a/old.pdf", "")
assert changed is False
client.set_payload.assert_not_called()

async def test_rewrites_path_and_title_on_rename(self, client) -> None:
changed = await ss.reconcile_document_path(
"42", "file", "/a/old.pdf", "/a/new-name.pdf"
)
assert changed is True
client.set_payload.assert_awaited_once()
kwargs = client.set_payload.await_args.kwargs
assert kwargs["payload"]["file_path"] == "/a/new-name.pdf"
assert kwargs["payload"]["title"] == "new-name.pdf"
# Only real (non-placeholder) chunks of this document are updated.
assert _must_keys(kwargs["points"]) == ["doc_id", "doc_type", "is_placeholder"]

async def test_backfills_when_no_stored_path(self, client) -> None:
# Legacy point with no stored file_path -> treated as changed (backfill).
changed = await ss.reconcile_document_path("42", "file", None, "/a/new.pdf")
assert changed is True
kwargs = client.set_payload.await_args.kwargs
assert kwargs["payload"]["title"] == "new.pdf"


class TestClaimExistingIndex:
async def test_true_and_grants_principal_on_hit(self, client) -> None:
client.scroll.return_value = (
Expand All @@ -137,6 +185,73 @@ async def test_false_when_not_indexed(self, client) -> None:
assert await ss.claim_existing_index("42", "file", "abc", "bob") is False
client.set_payload.assert_not_called()

async def test_dedup_hit_reconciles_stale_path(self, client) -> None:
# Same content (etag) at a new path = a rename the dedup would otherwise
# skip. The user is already a principal, so the only write is the path
# reconcile (one set_payload with the refreshed file_path + title).
client.scroll.return_value = (
[
_point(
{
payload_keys.EMBEDDING_IDENTITY: _MODEL,
ss.ACL_PRINCIPALS_KEY: ["user:bob"],
"file_path": "/a/old.pdf",
}
)
],
None,
)
claimed = await ss.claim_existing_index(
"42", "file", "abc", "bob", current_path="/a/new.pdf"
)
assert claimed is True
client.set_payload.assert_awaited_once()
payload = client.set_payload.await_args.kwargs["payload"]
assert payload["file_path"] == "/a/new.pdf"
assert payload["title"] == "new.pdf"

async def test_dedup_hit_reconciles_and_grants_new_principal(self, client) -> None:
# Renamed file (stale path) AND a user not yet in the ACL: both writes
# fire — one set_payload for file_path/title, one for acl_principals.
client.scroll.return_value = (
[
_point(
{
payload_keys.EMBEDDING_IDENTITY: _MODEL,
ss.ACL_PRINCIPALS_KEY: ["user:alice"],
"file_path": "/a/old.pdf",
}
)
],
None,
)
claimed = await ss.claim_existing_index(
"42", "file", "abc", "bob", current_path="/a/new.pdf"
)
assert claimed is True
assert client.set_payload.await_count == 2
payloads = [c.kwargs["payload"] for c in client.set_payload.await_args_list]
# One write refreshes the path/title, the other unions the new principal.
assert {"file_path": "/a/new.pdf", "title": "new.pdf"} in payloads
assert {ss.ACL_PRINCIPALS_KEY: ["user:alice", "user:bob"]} in payloads

async def test_dedup_hit_without_current_path_skips_reconcile(self, client) -> None:
# No current_path (non-file callers) -> never touches file_path/title.
client.scroll.return_value = (
[
_point(
{
payload_keys.EMBEDDING_IDENTITY: _MODEL,
ss.ACL_PRINCIPALS_KEY: ["user:bob"],
"file_path": "/a/old.pdf",
}
)
],
None,
)
assert await ss.claim_existing_index("42", "file", "abc", "bob") is True
client.set_payload.assert_not_called()

async def test_hit_for_already_listed_user_writes_nothing(self, client) -> None:
client.scroll.return_value = (
[
Expand Down
Loading