-
Notifications
You must be signed in to change notification settings - Fork 7
feat: Add Couchbase key-value store support #298
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Add CouchbaseStore implementation using the acouchbase async SDK for async key-value operations with Couchbase Server. Features: - Full AsyncKeyValue protocol implementation - TTL support via Couchbase document expiration - Collection sanitization strategy for safe collection names - Connection string or pre-configured cluster support - Proper resource management with async context manager Install with: pip install py-key-value-aio[couchbase] Closes #292 Co-authored-by: Bill Easton <[email protected]>
📝 WalkthroughWalkthroughThis PR introduces comprehensive Couchbase support to the async key-value store library. It adds a new async Couchbase-backed store implementation with document serialization, collection name sanitization, full lifecycle management, CRUD operations, and Docker-based integration tests. Changes
Sequence DiagramsequenceDiagram
actor Client
participant CouchbaseStore
participant AsyncCluster
participant Couchbase as Couchbase Server
Client->>CouchbaseStore: __init__(connection_string, credentials)
CouchbaseStore->>CouchbaseStore: Initialize configuration
Client->>CouchbaseStore: _setup()
CouchbaseStore->>AsyncCluster: Connect via connection_string
AsyncCluster->>Couchbase: Establish connection
Couchbase-->>AsyncCluster: Connection established
AsyncCluster-->>CouchbaseStore: Cluster ready
CouchbaseStore->>CouchbaseStore: Get bucket reference
Client->>CouchbaseStore: _setup_collection(collection="users")
CouchbaseStore->>CouchbaseStore: Sanitize collection name
CouchbaseStore->>CouchbaseStore: Store collection reference
Client->>CouchbaseStore: _put_managed_entry(key, entry, TTL)
CouchbaseStore->>CouchbaseStore: Serialize entry to JSON dict
CouchbaseStore->>AsyncCluster: Upsert document with TTL
AsyncCluster->>Couchbase: Store document
Couchbase-->>AsyncCluster: Document stored
AsyncCluster-->>CouchbaseStore: Upsert complete
Client->>CouchbaseStore: _get_managed_entry(key)
CouchbaseStore->>AsyncCluster: Fetch document
AsyncCluster->>Couchbase: Get document
Couchbase-->>AsyncCluster: Return document
AsyncCluster-->>CouchbaseStore: Document retrieved
CouchbaseStore->>CouchbaseStore: Deserialize JSON dict to entry
CouchbaseStore-->>Client: Return ManagedEntry
Client->>CouchbaseStore: _delete_managed_entry(key)
CouchbaseStore->>AsyncCluster: Delete document
AsyncCluster->>Couchbase: Remove document
Couchbase-->>AsyncCluster: Deletion confirmed
AsyncCluster-->>CouchbaseStore: Delete complete
Estimated Code Review Effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pyproject.toml (1)
54-66:⚠️ Potential issue | 🟡 MinorRelax Couchbase SDK minimum version to 4.0.0.
The store uses only APIs available in Couchbase Python SDK 4.0.0+:
acouchbase.cluster.AsyncCluster,couchbase.options.GetOptions,couchbase.options.UpsertOptions, andcouchbase.auth.PasswordAuthenticatorwere all introduced in the 4.0 release line. The currentcouchbase>=4.3.0constraint is unnecessarily restrictive and should be relaxed tocouchbase>=4.0.0to allow more users to use compatible SDK versions.
🤖 Fix all issues with AI agents
In `@src/key_value/aio/stores/couchbase/store.py`:
- Around line 51-81: The prepare_load method currently uses data.pop("value")
which can raise KeyError; wrap the access in a guarded check or try/except and,
if missing, raise a DeserializationError with a clear message instead of letting
KeyError propagate so callers like _get_managed_entry receive a
DeserializationError; specifically, replace the direct pop in prepare_load with
a safe retrieval (e.g., check "value" in data or try: ... except KeyError) and
raise DeserializationError(message="Value field not found in Couchbase
document") when absent while preserving the existing logic that extracts
value["object"] when present.
- Around line 381-392: _delete_collection currently only removes the in-memory
reference (_collections_by_name.pop) and returns True without deleting
documents; either implement a proper deletion (issue a Couchbase N1QL DELETE or
KV delete-by-prefix using the cluster/collection client to remove docs whose
keys start with the collection prefix) inside _delete_collection and only return
True on success (handle and log errors and return False/raise on failure), or
explicitly mark the operation unsupported by raising NotImplementedError (or
returning False) instead of returning True; update references to
_collections_by_name accordingly so re-initializing a collection cannot leave
stale data.
- Around line 157-210: The __init__ currently allows both cluster and
connection_string to be passed and silently ignores connection_string; add an
explicit guard at the start of Couchbase store __init__ to detect and raise a
ValueError when both cluster and connection_string are provided (message like
"Provide either cluster or connection_string, not both"), leaving the existing
checks for neither provided and for username/password unchanged; ensure you
still set cluster_provided = True when cluster is passed so
client_provided_by_user remains correct.
- Around line 212-222: The _setup method currently registers self._cluster.close
via self._exit_stack.callback which won’t await AsyncCluster.close and can leak
connections; change the cleanup registration in _setup to use
self._exit_stack.push_async_callback(self._cluster.close) (or detect coroutine
with inspect.iscoroutinefunction and wrap accordingly) so that when not
self._client_provided_by_user the async close is awaited properly; keep the
existing bucket = self._cluster.bucket(...) and await bucket.on_connect()
behavior unchanged.
In `@tests/stores/couchbase/test_couchbase.py`:
- Around line 125-139: The test function signatures for
test_long_collection_name and test_special_characters_in_collection_name exceed
140 characters; reformat each async def signature to multiple lines by placing
parameters on separate lines (e.g., one parameter per line) while preserving the
type annotations (CouchbaseStore) and the inline comment "# pyright:
ignore[reportIncompatibleMethodOverride]" and keeping the decorator "@override"
attached to the second test; ensure indentation and trailing colon remain
correct so the function bodies are unchanged.
| @override | ||
| def prepare_load(self, data: dict[str, Any]) -> dict[str, Any]: | ||
| value = data.pop("value") | ||
|
|
||
| if "object" in value: | ||
| data["value"] = value["object"] | ||
| else: | ||
| msg = "Value field not found in Couchbase document" | ||
| raise DeserializationError(message=msg) | ||
|
|
||
| if date_created := data.get("created_at"): | ||
| if isinstance(date_created, str): | ||
| # Parse ISO format datetime string | ||
| data["created_at"] = datetime.fromisoformat(date_created.replace("Z", "+00:00")) | ||
| elif not isinstance(date_created, datetime): | ||
| msg = "Expected `created_at` field to be a datetime or ISO string" | ||
| raise DeserializationError(message=msg) | ||
| else: | ||
| data["created_at"] = date_created.replace(tzinfo=timezone.utc) | ||
|
|
||
| if date_expires := data.get("expires_at"): | ||
| if isinstance(date_expires, str): | ||
| # Parse ISO format datetime string | ||
| data["expires_at"] = datetime.fromisoformat(date_expires.replace("Z", "+00:00")) | ||
| elif not isinstance(date_expires, datetime): | ||
| msg = "Expected `expires_at` field to be a datetime or ISO string" | ||
| raise DeserializationError(message=msg) | ||
| else: | ||
| data["expires_at"] = date_expires.replace(tzinfo=timezone.utc) | ||
|
|
||
| return data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle missing value key to avoid KeyError leaks.
data.pop("value") can raise KeyError, which won’t be caught as a DeserializationError in _get_managed_entry. Prefer a guarded pop and raise a DeserializationError instead.
🛠️ Proposed fix
- value = data.pop("value")
+ value = data.pop("value", None)
+ if value is None:
+ msg = "Value field not found in Couchbase document"
+ raise DeserializationError(message=msg)🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 51 - 81, The
prepare_load method currently uses data.pop("value") which can raise KeyError;
wrap the access in a guarded check or try/except and, if missing, raise a
DeserializationError with a clear message instead of letting KeyError propagate
so callers like _get_managed_entry receive a DeserializationError; specifically,
replace the direct pop in prepare_load with a safe retrieval (e.g., check
"value" in data or try: ... except KeyError) and raise
DeserializationError(message="Value field not found in Couchbase document") when
absent while preserving the existing logic that extracts value["object"] when
present.
| def __init__( | ||
| self, | ||
| *, | ||
| cluster: AsyncCluster | None = None, | ||
| connection_string: str | None = None, | ||
| username: str | None = None, | ||
| password: str | None = None, | ||
| bucket_name: str | None = None, | ||
| scope_name: str | None = None, | ||
| default_collection: str | None = None, | ||
| collection_sanitization_strategy: SanitizationStrategy | None = None, | ||
| ) -> None: | ||
| """Initialize the Couchbase store. | ||
|
|
||
| Values are stored as native JSON documents for better query support and performance. | ||
|
|
||
| Args: | ||
| cluster: The async Couchbase cluster to use (mutually exclusive with connection_string). | ||
| If provided, the store will not manage the cluster's lifecycle. | ||
| The caller is responsible for managing the cluster's lifecycle. | ||
| connection_string: The Couchbase connection string (mutually exclusive with cluster). | ||
| username: The username for authentication (required if connection_string is provided). | ||
| password: The password for authentication (required if connection_string is provided). | ||
| bucket_name: The name of the Couchbase bucket. | ||
| scope_name: The name of the Couchbase scope. | ||
| default_collection: The default collection to use if no collection is provided. | ||
| collection_sanitization_strategy: The sanitization strategy to use for collections. | ||
| """ | ||
|
|
||
| cluster_provided = cluster is not None | ||
|
|
||
| if cluster: | ||
| self._cluster = cluster | ||
| else: | ||
| if not connection_string: | ||
| msg = "Either cluster or connection_string must be provided" | ||
| raise ValueError(msg) | ||
| if not username or not password: | ||
| msg = "username and password are required when using connection_string" | ||
| raise ValueError(msg) | ||
|
|
||
| auth = PasswordAuthenticator(username, password) | ||
| self._cluster = AsyncCluster(connection_string, ClusterOptions(auth)) | ||
|
|
||
| self._bucket_name = bucket_name or DEFAULT_BUCKET | ||
| self._scope_name = scope_name or DEFAULT_SCOPE | ||
| self._collections_by_name = {} | ||
| self._adapter = CouchbaseSerializationAdapter() | ||
|
|
||
| super().__init__( | ||
| default_collection=default_collection, | ||
| collection_sanitization_strategy=collection_sanitization_strategy, | ||
| client_provided_by_user=cluster_provided, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforce mutual exclusivity of cluster vs connection_string.
Docs say they’re mutually exclusive, but both can currently be provided (and connection_string would be ignored). Add a guard to prevent ambiguous configuration.
🛠️ Proposed fix
cluster_provided = cluster is not None
+
+ if cluster and connection_string:
+ msg = "Provide either cluster or connection_string, not both"
+ raise ValueError(msg)🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 157 - 210, The
__init__ currently allows both cluster and connection_string to be passed and
silently ignores connection_string; add an explicit guard at the start of
Couchbase store __init__ to detect and raise a ValueError when both cluster and
connection_string are provided (message like "Provide either cluster or
connection_string, not both"), leaving the existing checks for neither provided
and for username/password unchanged; ensure you still set cluster_provided =
True when cluster is passed so client_provided_by_user remains correct.
| @override | ||
| async def _setup(self) -> None: | ||
| """Initialize the cluster connection and register cleanup.""" | ||
| # Wait for cluster connection | ||
| bucket = self._cluster.bucket(self._bucket_name) # pyright: ignore[reportUnknownMemberType] | ||
| await bucket.on_connect() # pyright: ignore[reportUnknownMemberType] | ||
|
|
||
| # Register cleanup if we own the cluster | ||
| if not self._client_provided_by_user: | ||
| self._exit_stack.callback(self._cluster.close) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's look at the file structure and understand the context
cd src/key_value/aio/stores/couchbase && head -250 store.py | tail -100Repository: strawgate/py-key-value
Length of output: 4699
🏁 Script executed:
# Check imports and AsyncCluster usage
head -50 src/key_value/aio/stores/couchbase/store.pyRepository: strawgate/py-key-value
Length of output: 1785
🏁 Script executed:
# Search for _cluster and close usage patterns
rg "AsyncCluster|_cluster.close" src/key_value/aio/stores/couchbase/Repository: strawgate/py-key-value
Length of output: 632
🏁 Script executed:
# Check if there are any type hints or stubs for couchbase
fd -e pyi . | rg -i couchbaseRepository: strawgate/py-key-value
Length of output: 48
🌐 Web query:
couchbase-sdk python AsyncCluster.close() async method signature
💡 Result:
In the Couchbase Python SDK (acouchbase), the asyncio cluster close method is:
async def close(self) -> NoneIt’s documented as async close() → None on acouchbase.cluster.AsyncCluster. [1]
Sources: [1] (docs.couchbase.com)
Citations:
Use push_async_callback() to properly close the async cluster.
The Couchbase SDK's AsyncCluster.close() is an async method and must be awaited. Registering it via ExitStack.callback() won't await it, causing connection leaks. Use push_async_callback() instead or a runtime check with inspect.iscoroutinefunction() for compatibility:
Fix
+import inspect
...
if not self._client_provided_by_user:
- self._exit_stack.callback(self._cluster.close)
+ close = self._cluster.close
+ if inspect.iscoroutinefunction(close):
+ self._exit_stack.push_async_callback(close)
+ else:
+ self._exit_stack.callback(close)🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 212 - 222, The
_setup method currently registers self._cluster.close via
self._exit_stack.callback which won’t await AsyncCluster.close and can leak
connections; change the cleanup registration in _setup to use
self._exit_stack.push_async_callback(self._cluster.close) (or detect coroutine
with inspect.iscoroutinefunction and wrap accordingly) so that when not
self._client_provided_by_user the async close is awaited properly; keep the
existing bucket = self._cluster.bucket(...) and await bucket.on_connect()
behavior unchanged.
| @override | ||
| async def _delete_collection(self, *, collection: str) -> bool: | ||
| """Delete all documents in the collection. | ||
|
|
||
| Note: This doesn't delete the Couchbase collection itself, just removes all | ||
| documents with keys matching the collection prefix. | ||
| """ | ||
| # Since we prefix keys with collection name, we can't easily delete all | ||
| # documents without a query. For now, just clear the reference. | ||
| self._collections_by_name.pop(collection, None) | ||
|
|
||
| return True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_delete_collection reports success without deleting data.
This method only removes the in-memory reference and returns True, but leaves all documents intact. That violates destroy semantics and can cause stale data if the collection is re-initialized. Consider implementing a query-based delete or explicitly signaling “unsupported” rather than returning success.
🛠️ Safer interim behavior
- # Since we prefix keys with collection name, we can't easily delete all
- # documents without a query. For now, just clear the reference.
- self._collections_by_name.pop(collection, None)
-
- return True
+ # Since we prefix keys with collection name, we can't easily delete all
+ # documents without a query. Signal that deletion isn't implemented.
+ msg = "CouchbaseStore.delete_collection does not delete documents yet"
+ raise NotImplementedError(msg)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| @override | |
| async def _delete_collection(self, *, collection: str) -> bool: | |
| """Delete all documents in the collection. | |
| Note: This doesn't delete the Couchbase collection itself, just removes all | |
| documents with keys matching the collection prefix. | |
| """ | |
| # Since we prefix keys with collection name, we can't easily delete all | |
| # documents without a query. For now, just clear the reference. | |
| self._collections_by_name.pop(collection, None) | |
| return True | |
| `@override` | |
| async def _delete_collection(self, *, collection: str) -> bool: | |
| """Delete all documents in the collection. | |
| Note: This doesn't delete the Couchbase collection itself, just removes all | |
| documents with keys matching the collection prefix. | |
| """ | |
| # Since we prefix keys with collection name, we can't easily delete all | |
| # documents without a query. Signal that deletion isn't implemented. | |
| msg = "CouchbaseStore.delete_collection does not delete documents yet" | |
| raise NotImplementedError(msg) |
🤖 Prompt for AI Agents
In `@src/key_value/aio/stores/couchbase/store.py` around lines 381 - 392,
_delete_collection currently only removes the in-memory reference
(_collections_by_name.pop) and returns True without deleting documents; either
implement a proper deletion (issue a Couchbase N1QL DELETE or KV
delete-by-prefix using the cluster/collection client to remove docs whose keys
start with the collection prefix) inside _delete_collection and only return True
on success (handle and log errors and return False/raise on failure), or
explicitly mark the operation unsupported by raising NotImplementedError (or
returning False) instead of returning True; update references to
_collections_by_name accordingly so re-initializing a collection cannot leave
stale data.
| async def test_long_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride] | ||
| with pytest.raises(Exception): # noqa: B017, PT011 | ||
| await store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) | ||
|
|
||
| await sanitizing_store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"}) | ||
| assert await sanitizing_store.get(collection="test_collection" * 100, key="test_key") == {"test": "test"} | ||
|
|
||
| @override | ||
| async def test_special_characters_in_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride] | ||
| """Tests that special characters in the collection name will not raise an error.""" | ||
| with pytest.raises(Exception): # noqa: B017, PT011 | ||
| await store.put(collection="test_collection!@#$%^&*()", key="test_key", value={"test": "test"}) | ||
|
|
||
| await sanitizing_store.put(collection="test_collection!@#$%^&*()", key="test_key", value={"test": "test"}) | ||
| assert await sanitizing_store.get(collection="test_collection!@#$%^&*()", key="test_key") == {"test": "test"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's locate and read the test file
fd "test_couchbase.py" tests/Repository: strawgate/py-key-value
Length of output: 106
🏁 Script executed:
# Read the specific lines from the test file
sed -n '125,139p' tests/stores/couchbase/test_couchbase.py | cat -nRepository: strawgate/py-key-value
Length of output: 1400
🏁 Script executed:
# Let's also check the exact character length of the problematic lines
python3 << 'EOF'
import sys
# Read the file
with open('tests/stores/couchbase/test_couchbase.py', 'r') as f:
lines = f.readlines()
# Check lines 125 and 132 (converting from 1-indexed to 0-indexed)
for line_num in [124, 131]: # 125 and 132 in 1-indexed
if line_num < len(lines):
line = lines[line_num].rstrip('\n')
print(f"Line {line_num + 1}: Length={len(line)}")
print(f"Content: {line}")
print()
EOFRepository: strawgate/py-key-value
Length of output: 297
🏁 Script executed:
# Let's also examine the broader context and check if the test class structure uses ContextManagerStoreTestMixin
head -n 150 tests/stores/couchbase/test_couchbase.py | tail -n 50Repository: strawgate/py-key-value
Length of output: 2911
🏁 Script executed:
# Let's verify the exact length of line 133 (test_special_characters_in_collection_name)
python3 << 'EOF'
with open('tests/stores/couchbase/test_couchbase.py', 'r') as f:
lines = f.readlines()
# Check line 133 (0-indexed = 132)
if len(lines) > 132:
line = lines[132].rstrip('\n')
print(f"Line 133: Length={len(line)}")
print(f"Content: {line}")
EOFRepository: strawgate/py-key-value
Length of output: 269
Wrap method signatures to keep lines ≤ 140 characters.
Lines 125 and 133 exceed the 140-character limit (156 and 173 characters respectively). Split the parameters across multiple lines while keeping the type annotations and pyright ignore comment intact.
🛠️ Proposed refactor
- async def test_long_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride]
+ async def test_long_collection_name(
+ self,
+ store: CouchbaseStore,
+ sanitizing_store: CouchbaseStore,
+ ): # pyright: ignore[reportIncompatibleMethodOverride]
with pytest.raises(Exception): # noqa: B017, PT011
await store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"})
await sanitizing_store.put(collection="test_collection" * 100, key="test_key", value={"test": "test"})
assert await sanitizing_store.get(collection="test_collection" * 100, key="test_key") == {"test": "test"}
`@override`
- async def test_special_characters_in_collection_name(self, store: CouchbaseStore, sanitizing_store: CouchbaseStore): # pyright: ignore[reportIncompatibleMethodOverride]
+ async def test_special_characters_in_collection_name(
+ self,
+ store: CouchbaseStore,
+ sanitizing_store: CouchbaseStore,
+ ): # pyright: ignore[reportIncompatibleMethodOverride]
"""Tests that special characters in the collection name will not raise an error."""
with pytest.raises(Exception): # noqa: B017, PT011
await store.put(collection="test_collection!@#$%^&*()", key="test_key", value={"test": "test"})🤖 Prompt for AI Agents
In `@tests/stores/couchbase/test_couchbase.py` around lines 125 - 139, The test
function signatures for test_long_collection_name and
test_special_characters_in_collection_name exceed 140 characters; reformat each
async def signature to multiple lines by placing parameters on separate lines
(e.g., one parameter per line) while preserving the type annotations
(CouchbaseStore) and the inline comment "# pyright:
ignore[reportIncompatibleMethodOverride]" and keeping the decorator "@override"
attached to the second test; ensure indentation and trailing colon remain
correct so the function bodies are unchanged.
Add CouchbaseStore implementation using the acouchbase async SDK for async key-value operations with Couchbase Server.
Features:
Install with:
pip install py-key-value-aio[couchbase]Closes #292
Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Tests