Bulk Upsert (note that the first part was accidentally already merged in main before)#187
Bulk Upsert (note that the first part was accidentally already merged in main before)#187NumericalAdvantage wants to merge 11 commits intomainfrom
Conversation
Summary of ChangesHello @NumericalAdvantage, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the resilience of the bulk report upsert mechanism. It introduces comprehensive deduplication logic to process incoming payloads, ensuring that duplicate entries at various levels—including the main reports, associated metadata, modalities, and groups—are identified and handled without causing the entire operation to fail. Instead, duplicates are dropped, and a warning is logged, allowing the upsert to proceed successfully with valid, unique data. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
📝 WalkthroughWalkthroughImplements in-memory deduplication for bulk-upsert payloads, enforces replace semantics, wraps DB writes in transactions with on_commit hooks to refresh search indices (sync or async), adds pgsearch bulk-index utilities/tasks, tightens serializer group querysets, adds client timeout option, and tests. Changes
Sequence Diagram(s)mermaid Client->>API: POST /api/reports/bulk-upsert (payload, replace) Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Pull Request Review: Prevent Duplicate Entries in Bulk UpsertThank you for addressing this issue! This PR adds important deduplication logic to prevent database constraint violations during bulk upserts. Here's my detailed review: ✅ Strengths
🔍 Code Quality & Best Practices1. Nested Function OrganizationThe three deduplication functions ( # At module level (before _bulk_upsert_reports)
def _dedupe_by_key(items: list[dict[str, Any]], key_name: str) -> tuple[list[dict[str, Any]], int]:
"""Deduplicate items by a specified key, keeping first occurrence."""
# ... implementationRationale: Module-level functions can be unit-tested independently, improving test coverage and maintainability. 2. Inconsistent Deduplication Strategy
Issue: This inconsistency could confuse users and lead to unexpected behavior. The warning at line 42 says "keeping last occurrence" but modalities/groups keep the first. Recommendation: Use a consistent strategy (preferably "last wins" to match the top-level behavior) or document why different strategies are used. 3. Transaction Boundary IssueThe report upsert happens inside a transaction (line 173), but the many-to-many relationship updates happen outside that transaction (lines 192-246). This means:
Current code: with transaction.atomic(): # Lines 173-189
# Report creation/update
...
# Transaction ends here
if report_ids: # Lines 192+
# Metadata, modalities, groups - NOT in transaction
Metadata.objects.filter(report_id__in=report_ids).delete()
# ...Recommendation: Wrap the entire operation (reports + relationships) in a single transaction, or at minimum wrap each relationship section in its own transaction. 🐛 Potential Bugs1. Type Annotation Inconsistencydef _dedupe_groups(items: list[Any]) -> tuple[list[Any], int]:
group_id = getattr(group, "id", group) # Line 83The function accepts Recommendation: Add type hints based on actual usage (appears to be 2. Missing Validation for Metadata KeysLines 199-204 assume all items have a "key" field, but there's no explicit validation. If validation fails elsewhere, this could raise Impact: Low (validation should catch this earlier), but defensive programming would be safer. ⚡ Performance Considerations1. Multiple Iterations Over Validated ReportsThe code iterates over
Impact: For large payloads (1000+ reports), this could be noticeable. Recommendation: Consider combining some iterations where possible, though current approach prioritizes clarity. 2. Deduplication Complexity
Performance is reasonable for typical use cases. 🔒 Security ConcernsNo significant security issues identified. The deduplication logic doesn't introduce injection risks or authentication bypasses. 🧪 Test CoverageCritical Issue: This PR adds significant new logic but no tests for the bulk upsert endpoint exist in the repository. Missing Test Cases:
Recommendation: Add comprehensive test coverage for Example test structure: def test_bulk_upsert_with_duplicate_document_ids():
"""Should keep last occurrence when document_id appears multiple times"""
payload = [
{"document_id": "DOC-001", "body": "First version", ...},
{"document_id": "DOC-001", "body": "Second version", ...},
]
response = client.post('/api/reports/bulk-upsert/', payload)
assert response.json()["created"] == 1
report = Report.objects.get(document_id="DOC-001")
assert report.body == "Second version"📋 SummaryPriority Issues:
Overall Assessment: This is a solid improvement that prevents crashes from duplicate data. With the suggested fixes (especially tests and transaction handling), it will be production-ready. 📝 Suggested ChangesExample: Consistent "last wins" deduplicationdef _dedupe_by_key(
items: list[dict[str, Any]], key_name: str
) -> tuple[list[dict[str, Any]], int]:
"""Deduplicate items by key, keeping LAST occurrence."""
if not items:
return [], 0
by_key: dict[str, dict[str, Any]] = {}
initial_count = len(items)
for item in items:
by_key[item[key_name]] = item # Last one wins
duplicates = initial_count - len(by_key)
return list(by_key.values()), duplicatesExample: Wrap all operations in transactionwith transaction.atomic():
if new_reports:
Report.objects.bulk_create(new_reports, batch_size=BULK_DB_BATCH_SIZE)
if updated_reports:
Report.objects.bulk_update(...)
# Get report IDs
report_id_by_document_id = {...}
# All relationship updates inside same transaction
if report_ids:
# Metadata
Metadata.objects.filter(report_id__in=report_ids).delete()
# ... rest of metadata logic
# Modalities
# ... modality logic
# Groups
# ... group logicLet me know if you'd like help implementing any of these suggestions! |
There was a problem hiding this comment.
Code Review
This pull request effectively prevents crashes from duplicate entries in the bulk upsert payload by adding de-duplication logic. The implementation is sound. My review includes a few suggestions to improve consistency and efficiency:
- The de-duplication strategy for different entities is inconsistent (some keep the first duplicate, some keep the last). I've suggested making this consistent for predictable behavior.
- There are three separate loops to process metadata, modalities, and groups for reports. I've suggested combining them into a single loop for better performance.
radis/reports/api/viewsets.py
Outdated
| def _dedupe_by_key( | ||
| items: list[dict[str, Any]], key_name: str | ||
| ) -> tuple[list[dict[str, Any]], int]: | ||
| if not items: | ||
| return [], 0 | ||
| seen: set[str] = set() | ||
| deduped: list[dict[str, Any]] = [] | ||
| duplicates = 0 | ||
| for item in items: | ||
| key = item[key_name] | ||
| if key in seen: | ||
| duplicates += 1 | ||
| continue | ||
| seen.add(key) | ||
| deduped.append(item) | ||
| return deduped, duplicates |
There was a problem hiding this comment.
This function keeps the first occurrence of an item with a duplicate key. However, the de-duplication for reports (lines 33-45) and metadata (_dedupe_metadata) keeps the last occurrence. This inconsistency can be confusing and lead to subtle bugs. For consistency, consider changing this function to also keep the last occurrence. This would make the behavior of de-duplication predictable across the entire process.
def _dedupe_by_key(
items: list[dict[str, Any]], key_name: str
) -> tuple[list[dict[str, Any]], int]:
if not items:
return [], 0
by_key: dict[str, dict[str, Any]] = {}
duplicates = 0
for item in items:
key = item[key_name]
if key in by_key:
duplicates += 1
by_key[key] = item
return list(by_key.values()), duplicates
radis/reports/api/viewsets.py
Outdated
| def _dedupe_groups(items: list[Any]) -> tuple[list[Any], int]: | ||
| if not items: | ||
| return [], 0 | ||
| seen: set[int] = set() | ||
| deduped: list[Any] = [] | ||
| duplicates = 0 | ||
| for group in items: | ||
| group_id = getattr(group, "id", group) | ||
| if group_id in seen: | ||
| duplicates += 1 | ||
| continue | ||
| seen.add(group_id) | ||
| deduped.append(group) | ||
| return deduped, duplicates |
There was a problem hiding this comment.
Similar to _dedupe_by_key, this function keeps the first occurrence of a group. For consistency with report and metadata de-duplication, which keep the last occurrence, consider modifying this to also keep the last.
def _dedupe_groups(items: list[Any]) -> tuple[list[Any], int]:
if not items:
return [], 0
by_id: dict[int, Any] = {}
duplicates = 0
for group in items:
group_id = getattr(group, "id", group)
if group_id in by_id:
duplicates += 1
by_id[group_id] = group
return list(by_id.values()), duplicates
radis/reports/api/viewsets.py
Outdated
| metadata_rows: list[Metadata] = [] | ||
| metadata_duplicate_count = 0 | ||
| for report_data in validated_reports: | ||
| report_id = report_id_by_document_id[report_data["document_id"]] | ||
| metadata_items, duplicates = _dedupe_metadata(report_data.get("metadata", [])) | ||
| metadata_duplicate_count += duplicates | ||
| for item in metadata_items: | ||
| metadata_rows.append( | ||
| Metadata(report_id=report_id, key=item["key"], value=item["value"]) | ||
| ) | ||
| if metadata_rows: | ||
| Metadata.objects.bulk_create(metadata_rows, batch_size=BULK_DB_BATCH_SIZE) | ||
|
|
||
| modality_through = Report.modalities.through | ||
| modality_through.objects.filter(report_id__in=report_ids).delete() | ||
|
|
||
| modality_rows = [] | ||
| modality_duplicate_count = 0 | ||
| for report_data in validated_reports: | ||
| report_id = report_id_by_document_id[report_data["document_id"]] | ||
| modality_items, duplicates = _dedupe_by_key(report_data.get("modalities", []), "code") | ||
| modality_duplicate_count += duplicates | ||
| for modality in modality_items: | ||
| modality_id = modality_by_code[modality["code"]].id | ||
| modality_rows.append( | ||
| modality_through(report_id=report_id, modality_id=modality_id) | ||
| ) | ||
| if modality_rows: | ||
| modality_through.objects.bulk_create(modality_rows, batch_size=BULK_DB_BATCH_SIZE) | ||
|
|
||
| group_through = Report.groups.through | ||
| group_through.objects.filter(report_id__in=report_ids).delete() | ||
|
|
||
| group_rows = [] | ||
| group_duplicate_count = 0 | ||
| for report_data in validated_reports: | ||
| report_id = report_id_by_document_id[report_data["document_id"]] | ||
| group_items, duplicates = _dedupe_groups(report_data.get("groups", [])) | ||
| group_duplicate_count += duplicates | ||
| for group in group_items: | ||
| group_rows.append(group_through(report_id=report_id, group_id=group.id)) | ||
| if group_rows: | ||
| group_through.objects.bulk_create(group_rows, batch_size=BULK_DB_BATCH_SIZE) |
There was a problem hiding this comment.
These three loops over validated_reports (for metadata, modalities, and groups) can be combined into a single loop. This would be more efficient as it avoids iterating over the validated_reports list multiple times.
Here's how you could structure it:
metadata_rows: list[Metadata] = []
metadata_duplicate_count = 0
modality_rows = []
modality_duplicate_count = 0
group_rows = []
group_duplicate_count = 0
for report_data in validated_reports:
report_id = report_id_by_document_id[report_data["document_id"]]
# Metadata
metadata_items, duplicates = _dedupe_metadata(report_data.get("metadata", []))
metadata_duplicate_count += duplicates
for item in metadata_items:
metadata_rows.append(
Metadata(report_id=report_id, key=item["key"], value=item["value"])
)
# Modalities
modality_items, duplicates = _dedupe_by_key(report_data.get("modalities", []), "code")
modality_duplicate_count += duplicates
for modality in modality_items:
modality_id = modality_by_code[modality["code"]].id
modality_rows.append(
modality_through(report_id=report_id, modality_id=modality_id)
)
# Groups
group_items, duplicates = _dedupe_groups(report_data.get("groups", []))
group_duplicate_count += duplicates
for group in group_items:
group_rows.append(group_through(report_id=report_id, group_id=group.id))
if metadata_rows:
Metadata.objects.bulk_create(metadata_rows, batch_size=BULK_DB_BATCH_SIZE)
if modality_rows:
modality_through.objects.bulk_create(modality_rows, batch_size=BULK_DB_BATCH_SIZE)
if group_rows:
group_through.objects.bulk_create(group_rows, batch_size=BULK_DB_BATCH_SIZE)There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@radis/reports/api/viewsets.py`:
- Around line 76-89: _dedupe_groups currently returns the original items which
can be either objects or ints, causing AttributeError later when code expects
.id; change _dedupe_groups (the function) to normalize every entry to its
integer id (use getattr(group, "id", group) and cast/ensure int) and append that
id to deduped, keep counting duplicates as before, and return the list of
normalized ids plus the duplicate count so callers (e.g., the call site that
does group_rows.append(group_through(report_id=report_id, group_id=group.id)))
can safely use group ids.
Pull Request Review: Prevent Duplicate Entries in Bulk UpsertThank you for this PR! The deduplication logic is a valuable addition that prevents crashes from duplicate entries in bulk upload payloads. Here's my detailed review: ✅ Strengths
🔍 Code Quality Issues1. Inconsistent Deduplication StrategyThe PR uses three different deduplication helper functions with overlapping logic:
Issue: Recommendation: Consolidate into a single generic deduplication function: def _dedupe_items(
items: list[Any],
key_fn: Callable[[Any], str | int],
last_wins: bool = True
) -> tuple[list[Any], int]:
"""Deduplicate items using a key function.
Args:
items: Items to deduplicate
key_fn: Function to extract the unique key from each item
last_wins: If True, keep last occurrence; if False, keep first
"""
if not items:
return [], 0
if last_wins:
seen: dict[str | int, Any] = {}
for item in items:
seen[key_fn(item)] = item
return list(seen.values()), len(items) - len(seen)
else:
seen_keys: set[str | int] = set()
deduped = []
duplicates = 0
for item in items:
key = key_fn(item)
if key in seen_keys:
duplicates += 1
continue
seen_keys.add(key)
deduped.append(item)
return deduped, duplicatesUsage: metadata_items, dups = _dedupe_items(
report_data.get("metadata", []),
key_fn=lambda x: x["key"],
last_wins=True
)2. Type Safety Issue in
|
| Category | Rating | Notes |
|---|---|---|
| Functionality | ⭐⭐⭐⭐ | Solves the problem effectively |
| Code Quality | ⭐⭐⭐ | Works but has inconsistencies |
| Performance | ⭐⭐⭐⭐ | Appropriate for expected scale |
| Security | ⭐⭐⭐⭐⭐ | No concerns |
| Test Coverage | ⭐ | Critical gap |
| Documentation | ⭐⭐⭐ | Logs are good; code comments minimal |
🎯 Recommendations Priority
Must Fix Before Merge:
- Add comprehensive test coverage (highest priority)
- Fix line 158: Use
existing.language_id = language.pkfor efficiency
Should Fix:
- Consolidate deduplication logic into single generic function
- Fix
_dedupe_groups()type safety: Use explicit type checking
Nice to Have:
- Add docstrings to helper functions
- Consider combining the three relationship-building loops (lines 197-240)
✅ Approval Recommendation
Status: Approve with required changes
This PR provides valuable functionality and prevents real crashes. However, test coverage is critical before merging to production. Once tests are added and the efficiency issue on line 158 is fixed, this is ready to merge.
Great work on identifying and solving this issue! 🚀
Pull Request Review: Bulk Upsert DeduplicationThank you for this contribution! I've reviewed the changes and have the following feedback: ✅ Strengths
🔍 Code Quality IssuesCritical: Logic inconsistency in
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@radis/reports/tests/test_bulk_upsert.py`:
- Line 31: The test payload in test_bulk_upsert.py uses
timezone.make_aware(datetime(...)) for the "study_datetime" field which
json.dumps can't serialize; replace that datetime object with an ISO-8601 string
(e.g. "2024-01-01T00:00:00+00:00" or use .isoformat()) so the payload is
JSON-serializable and consistent with the other payload entry (update the
payload definition used in the failing test).
Code Review: Bulk Upsert DeduplicationI've reviewed this PR and here's my feedback: ✅ Strengths
🔍 Issues and Concerns1. Inconsistent Behavior: Last-Wins Deduplication (Medium Priority)The deduplication strategy keeps the last occurrence of duplicates: # Line 39 in viewsets.py
deduped_reports[document_id] = report # overwrites previousIssue: This "last-wins" approach may be counterintuitive and could hide data quality problems. If a client accidentally sends duplicates, they might expect an error rather than silent deduplication. Recommendation: Consider one of these approaches:
2. Type Safety Issue in _dedupe_groups() (Low Priority)Lines 73-84 show type handling that could be cleaner: def _dedupe_groups(items: list[Any]) -> tuple[list[int], int]:
for group in items:
group_id = getattr(group, "pk", group) # Mixed types
group_id = int(group_id)Issue: The function accepts Recommendation: def _dedupe_groups(items: list[Any]) -> tuple[list[int], int]:
"""Deduplicate group references (handles both Group objects and IDs)."""
if not items:
return [], 0
by_id: dict[int, int] = {}
duplicates = 0
for group in items:
# Handle both Group instances and raw integers
group_id = group.pk if hasattr(group, 'pk') else int(group)
if group_id in by_id:
duplicates += 1
by_id[group_id] = group_id
return list(by_id.values()), duplicates3. Redundant Function: _dedupe_metadata() (Minor)Lines 61-71 define Recommendation: Remove 4. Database Query Inefficiency (Minor)Line 179-184 queries all reports again after bulk creation/update: report_id_by_document_id = {
report.document_id: report.pk
for report in Report.objects.filter(document_id__in=document_ids).only(
"id", "document_id"
)
}Issue: For newly created reports, we already have the objects in memory ( Recommendation: Consider using 5. Missing Test Coverage (Medium Priority)The tests don't cover:
Recommendation: Add tests for edge cases, especially invalid group IDs which could cause runtime errors. 6. Docstring Missing (Minor)The
🛡️ Security & Performance
📝 Style & Standards
🎯 Recommendations SummaryMust Fix:
Should Fix: Nice to Have: Overall, this is solid work that improves the robustness of bulk operations. The main concern is ensuring the deduplication behavior is well-documented and that edge cases are handled gracefully. |
Pull Request Review: Bulk Upsert DeduplicationThis PR adds deduplication logic to the bulk upsert endpoint. Overall, the implementation is solid and well-tested. Strengths
Code Quality & DesignPositive:
Areas for Consideration:
Security
Performance
TestingCovered:
Missing:
RecommendationsPriority: High
Priority: Medium Priority: Low Summary
Overall: This is a solid implementation that addresses the deduplication requirements effectively. The main suggestions are minor refactorings for code clarity rather than critical bugs. The PR is ready to merge with consideration for the high-priority simplification suggestions. Great work on maintaining transaction integrity and providing comprehensive logging! |
ec787af to
7abe4d7
Compare
Pull Request Review: Bulk Upsert Deduplication & Timeout SupportSummaryThis PR adds important deduplication logic to the bulk upsert endpoint and adds a timeout parameter to the client. The implementation handles duplicate document_ids, metadata keys, modality codes, and group IDs by keeping the last occurrence and logging consolidated warnings. ✅ Strengths
🔍 Code Quality Observations1. Duplicate Helper Functions (Minor)
You have three similar deduplication helpers:
Suggestion: 2. Missing Type Hint (Minor)
def _dedupe_groups(items: list[Any]) -> tuple[list[int], int]:The def _dedupe_groups(items: list[Group | int]) -> tuple[list[int], int]:3. Deduplication Order (Important Observation)
The report-level deduplication happens before validation context. This means if a user sends:
Report B will overwrite Report A even if it's invalid. The validation happens later at line 348. Question: Is this intentional? An alternative would be to validate first, then dedupe only valid payloads. Current behavior means a single invalid duplicate can cause a valid report to be skipped. 🐛 Potential Issues1. Empty init.py File (Low Priority)
The file contains only a newline. This is fine for making it a package, but seems unusual if it didn't exist before. Was this directory not a package previously? If tests were already working, this file might be unnecessary. 2. Metadata Deduplication Behavior (Design Question)
When metadata has duplicate keys, the last value wins. Consider this payload: {
"document_id": "DOC-1",
"metadata": {
"ris_filename": "file1.txt",
"ris_filename": "file2.txt" # Last one wins
}
}This is handled correctly via the serializer transformation (line 132-134 in serializers.py). However, the warning message doesn't distinguish between:
Consider: Is this warning actually reachable for metadata? Python dicts already dedupe keys, and the serializer converts them. A test specifically for this case would clarify the scenario. 🔒 Security ConsiderationsNo issues found. The PR doesn't introduce security vulnerabilities:
⚡ Performance Considerations1. Multiple Passes Over Data (Minor Impact)
The code iterates over
Impact: For 1000 reports, this is 3000 iterations vs. 1000 if combined. However, the DB operations (bulk_create) dominate performance, so this is acceptable. The current structure is more readable. 2. Deduplication Dictionary Construction (Negligible)The deduplication logic at lines 33-45 creates a dictionary with O(n) space and time complexity. This is optimal for the use case. 🧪 Test CoverageExcellent coverage:
Minor gap: No test for the warning log messages themselves. Consider adding: def test_bulk_upsert_logs_duplicate_warnings(client, caplog):
# Test that duplicate warnings are actually logged
# Verify the log message format and counts📝 DocumentationClient documentation: The timeout parameter is documented in the docstring ✅ API documentation: Consider updating API docs (if separate from code) to mention:
🎯 RecommendationsMust Address:
Should Consider:
Nice to Have:
✅ Overall AssessmentRecommendation: APPROVE with minor suggestions This is a solid implementation that:
The deduplication logic is sound, and the "keep last" strategy is reasonable and predictable. The timeout addition is a practical enhancement for production use. Nice work! 🎉 |
Pull Request Review: Bulk Upsert ImplementationI've reviewed this PR and overall it's a solid implementation with good test coverage. Here are my findings: ✅ Strengths
🔒 Security ConsiderationsSQL Injection Risk (Critical)In cursor.execute(
"""
UPDATE pgsearch_reportsearchvector v
SET search_vector = to_tsvector(%s::regconfig, r.body)
FROM reports_report r
WHERE v.report_id = r.id AND r.id = ANY(%s)
""",
[config, config_ids],
)✅ Good: Using parameterized queries with Permission Scoping (Important)In if request.user.is_superuser:
groups_field.queryset = groups_field.queryset.all()
else:
groups_field.queryset = request.user.groups.all()✅ Good: Non-superusers can only assign reports to groups they belong to. This prevents privilege escalation.
🐛 Potential Issues1. Unused
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@radis/reports/api/viewsets.py`:
- Around line 33-36: The function _bulk_upsert_reports has an unused parameter
replace which triggers Ruff ARG001; remove the replace parameter from
_bulk_upsert_reports's signature and update any callers to stop passing replace
(or stop forwarding that argument) so the function is invoked without that
argument; ensure you only change the signature and call sites related to
_bulk_upsert_reports and keep its return type and behavior intact.
In `@radis/settings/base.py`:
- Around line 157-160: Add documentation entries for the three new pgsearch
settings by updating example.env to include PGSEARCH_BULK_INDEX_CHUNK_SIZE
(default 5000), PGSEARCH_BULK_INSERT_BATCH_SIZE (default 1000), and
PGSEARCH_SYNC_INDEXING (default False) with a short description for each
(purpose and default). Ensure the variable names match the settings
(PGSEARCH_BULK_INDEX_CHUNK_SIZE, PGSEARCH_BULK_INSERT_BATCH_SIZE,
PGSEARCH_SYNC_INDEXING), include their default values, and add a one-line
comment explaining what each controls (chunk size for bulk indexing, batch size
for inserts, and whether indexing runs synchronously).
🧹 Nitpick comments (4)
radis/pgsearch/utils/language_utils.py (1)
22-24: Consider usinglogger.exceptionfor full traceback visibility.When catching exceptions,
logger.exceptionautomatically includes the traceback, which aids debugging. Withlogger.error, only the exception message is logged.Proposed fix
except DatabaseError as exc: - logger.error("Failed to read pg_ts_config; falling back to simple. %s", exc) + logger.exception("Failed to read pg_ts_config; falling back to simple.") return set()radis/reports/api/serializers.py (1)
54-62: Good security improvement to restrict group assignment.This correctly limits non-superusers to only assign reports to their own groups. However, line 60 has a redundant
.all()call since the queryset is already a queryset.Remove redundant .all() call
if groups_field.queryset is not None: if request.user.is_superuser: - groups_field.queryset = groups_field.queryset.all() + pass # Superuser keeps the original queryset (all groups) else: groups_field.queryset = request.user.groups.all()Or simplify the entire block:
- if groups_field.queryset is not None: - if request.user.is_superuser: - groups_field.queryset = groups_field.queryset.all() - else: - groups_field.queryset = request.user.groups.all() + if groups_field.queryset is not None and not request.user.is_superuser: + groups_field.queryset = request.user.groups.all()radis/pgsearch/tasks.py (1)
19-26: Type annotation can be tightened.The
payloadvariable is typed aslist[Any]but only contains integers. Sincereport_idsis alreadylist[int], the explicitint()conversion is defensive (good), but the type should reflect the actual contents.Tighten type annotation
def enqueue_bulk_index_reports(report_ids: list[int]) -> int | None: if not report_ids: return None - payload: list[Any] = [int(report_id) for report_id in report_ids] + payload: list[int] = [int(report_id) for report_id in report_ids] return app.configure_task( "radis.pgsearch.tasks.bulk_index_reports", allow_unknown=False, ).defer(report_ids=payload)This also allows removing the
Anyimport if unused elsewhere.radis/pgsearch/utils/indexing.py (1)
46-61: Consider wrapping bulk operations in a transaction.The loop processes multiple configs, and if an error occurs mid-way, some configs will be updated while others won't. This could leave the search index in an inconsistent state.
Wrap in transaction for atomicity
+from django.db import connection, transaction ... for config, config_ids in config_to_ids.items(): - ReportSearchVector.objects.bulk_create( - [ReportSearchVector(report_id=report_id) for report_id in config_ids], - ignore_conflicts=True, - batch_size=settings.PGSEARCH_BULK_INSERT_BATCH_SIZE, - ) - - with connection.cursor() as cursor: - cursor.execute( - """ - UPDATE pgsearch_reportsearchvector v - SET search_vector = to_tsvector(%s::regconfig, r.body) - FROM reports_report r - WHERE v.report_id = r.id AND r.id = ANY(%s) - """, - [config, config_ids], - ) + with transaction.atomic(): + ReportSearchVector.objects.bulk_create( + [ReportSearchVector(report_id=report_id) for report_id in config_ids], + ignore_conflicts=True, + batch_size=settings.PGSEARCH_BULK_INSERT_BATCH_SIZE, + ) + + with connection.cursor() as cursor: + cursor.execute( + """ + UPDATE pgsearch_reportsearchvector v + SET search_vector = to_tsvector(%s::regconfig, r.body) + FROM reports_report r + WHERE v.report_id = r.id AND r.id = ANY(%s) + """, + [config, config_ids], + )Alternatively, if partial progress is acceptable for idempotent retry scenarios, document this behavior.
| # pgsearch indexing tuning (bulk upsert/backfill) | ||
| PGSEARCH_BULK_INDEX_CHUNK_SIZE = env.int("PGSEARCH_BULK_INDEX_CHUNK_SIZE", default=5000) | ||
| PGSEARCH_BULK_INSERT_BATCH_SIZE = env.int("PGSEARCH_BULK_INSERT_BATCH_SIZE", default=1000) | ||
| PGSEARCH_SYNC_INDEXING = env.bool("PGSEARCH_SYNC_INDEXING", default=False) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find the file and check its existence
find . -name "base.py" -path "*/settings/*" | head -20Repository: openradx/radis
Length of output: 82
🏁 Script executed:
# Look for example.env or .env.example files
fd -t f "example\.env|\.env\.example|\.env"Repository: openradx/radis
Length of output: 69
🏁 Script executed:
# Check the specific lines in radis/settings/base.py
sed -n '155,165p' radis/settings/base.py | cat -nRepository: openradx/radis
Length of output: 678
🏁 Script executed:
# Check example.env for the new env var documentation
cat example.env | grep -n "PGSEARCH_BULK" -A 2 -B 2Repository: openradx/radis
Length of output: 40
🏁 Script executed:
# Check the actual line lengths of lines 157-160 in the file
python3 << 'EOF'
with open('radis/settings/base.py', 'r') as f:
lines = f.readlines()
# Lines 157-160 (0-indexed: 156-159)
for i in range(156, 160):
if i < len(lines):
line = lines[i].rstrip('\n')
print(f"Line {i+1}: {len(line)} chars - {line}")
EOFRepository: openradx/radis
Length of output: 144
🏁 Script executed:
# Search the entire example.env file to see its structure
cat example.env | head -50Repository: openradx/radis
Length of output: 1925
🏁 Script executed:
# Search full example.env for any PGSEARCH references
rg "PGSEARCH" example.envRepository: openradx/radis
Length of output: 40
🏁 Script executed:
# Check the actual lines from base.py and their lengths using wc or head/tail
sed -n '157,160p' radis/settings/base.py | while read line; do echo "${`#line`} chars: $line"; doneRepository: openradx/radis
Length of output: 402
🏁 Script executed:
# Get the full length of example.env to understand its scope
wc -l example.envRepository: openradx/radis
Length of output: 73
Add the new pgsearch configuration variables to example.env.
The settings implementation is well-structured with appropriate defaults and complies with the 100-character line length limit. However, the environment variables PGSEARCH_BULK_INDEX_CHUNK_SIZE, PGSEARCH_BULK_INSERT_BATCH_SIZE, and PGSEARCH_SYNC_INDEXING are not documented in example.env. Add these variables with their default values and a brief description so operators can discover and configure them.
🤖 Prompt for AI Agents
In `@radis/settings/base.py` around lines 157 - 160, Add documentation entries for
the three new pgsearch settings by updating example.env to include
PGSEARCH_BULK_INDEX_CHUNK_SIZE (default 5000), PGSEARCH_BULK_INSERT_BATCH_SIZE
(default 1000), and PGSEARCH_SYNC_INDEXING (default False) with a short
description for each (purpose and default). Ensure the variable names match the
settings (PGSEARCH_BULK_INDEX_CHUNK_SIZE, PGSEARCH_BULK_INSERT_BATCH_SIZE,
PGSEARCH_SYNC_INDEXING), include their default values, and add a one-line
comment explaining what each controls (chunk size for bulk indexing, batch size
for inserts, and whether indexing runs synchronously).
Pull Request Review: Bulk Upsert ImprovementsI've completed a comprehensive review of PR #187. Overall, this is a well-implemented feature with good test coverage and thoughtful design decisions. Here are my findings: ✅ Strengths1. Comprehensive Deduplication LogicThe deduplication implementation is excellent:
2. Performance Optimizations
3. Test CoverageExcellent test coverage with three focused tests:
4. Security ImprovementGroup restriction in serializer (radis/reports/api/serializers.py:54-62) is a significant security enhancement:
5. Docker ConfigurationThe Docker Compose refactoring improves maintainability:
🔍 Issues & RecommendationsCritical: SQL Injection RiskLocation: radis/pgsearch/utils/indexing.py:54-61 cursor.execute(
"""
UPDATE pgsearch_reportsearchvector v
SET search_vector = to_tsvector(%s::regconfig, r.body)
FROM reports_report r
WHERE v.report_id = r.id AND r.id = ANY(%s)
""",
[config, config_ids],
)Issue: The Recommendation: Whitelist valid config values and use string formatting for the config name: VALID_CONFIGS = {'simple', 'english', 'german', 'french', 'spanish', ...}
if config not in VALID_CONFIGS:
logger.warning(f"Invalid text search config: {config}, falling back to 'simple'")
config = 'simple'
cursor.execute(
f"""
UPDATE pgsearch_reportsearchvector v
SET search_vector = to_tsvector('{config}'::regconfig, r.body)
FROM reports_report r
WHERE v.report_id = r.id AND r.id = ANY(%s)
""",
[config_ids],
)Or use High: Missing Input ValidationLocation: radis/pgsearch/tasks.py:22 payload: list[Any] = [int(report_id) for report_id in report_ids]Issue: If a non-integer value is in Recommendation: Add validation with proper error handling: try:
payload: list[int] = [int(report_id) for report_id in report_ids]
except (ValueError, TypeError) as exc:
logger.error("Invalid report_id in bulk index request: %s", exc)
return NoneMedium: Potential Race ConditionLocation: radis/reports/api/viewsets.py:245-249 touched_report_ids = [
report_id_by_document_id[document_id]
for document_id in [*created_ids, *updated_ids]
if document_id in report_id_by_document_id
]Issue: This check happens inside the transaction, but the indexing happens in Impact: Low probability, but could cause task failures. Recommendation: Add error handling in Medium: Unused ParameterLocation: radis/reports/api/viewsets.py:34 def _bulk_upsert_reports(
validated_reports: list[dict[str, Any]],
replace: bool = True, # ⚠️ Not used anywhere
) -> tuple[list[str], list[str]]:Issue: The Recommendation: Either:
Also note that the viewset validates Low: Missing Error ContextLocation: radis/pgsearch/utils/language_utils.py:23 The change from logger.error("Failed to read pg_ts_config; falling back to simple. %s", exc, exc_info=True)Adding Low: Missing Bounds CheckLocation: radis/pgsearch/utils/indexing.py:23 ids = sorted({int(report_id) for report_id in report_ids if report_id is not None})Issue: No upper bound check on the number of IDs. Passing millions of IDs could cause memory issues. Recommendation: Add a sanity check: MAX_BULK_IDS = 100000 # Configure via settings
if len(ids) > MAX_BULK_IDS:
logger.warning(f"Bulk index request has {len(ids)} IDs, limiting to {MAX_BULK_IDS}")
ids = ids[:MAX_BULK_IDS]Low: Test ImprovementsMissing test cases:
📝 Code Quality ObservationsGood Practices
Style Notes (per Google Python Style Guide)
🎯 SummaryApproval Status: This PR adds valuable functionality with good architecture, but the SQL injection risk should be addressed before merging. The other issues are less critical but should be tracked for follow-up. Priority Actions
Estimated Risk
Great work on this feature! The deduplication logic is particularly well thought out, and the performance optimizations are solid. Once the SQL issue is addressed, this will be ready to merge. |
PR Review: Bulk Upsert ImplementationSummaryThis PR implements comprehensive bulk upsert functionality for reports with deduplication, async indexing, and group scoping. The changes are well-structured and address several important edge cases. Code Quality & Best PracticesStrengths
Minor Issues1. Group Queryset Filtering LogicLocation: radis/reports/api/serializers.py:54-62 The superuser branch calls .all() on a queryset that is already .all() (redundant). Simplify to: 2. Deduplication Function RedundancyLocation: radis/reports/api/viewsets.py:53-83 _dedupe_metadata and _dedupe_by_key do essentially the same thing. Consider consolidating. 3. SQL Injection ProtectionLocation: radis/pgsearch/utils/indexing.py:66-75 Verify that config from code_to_language() properly sanitizes input to prevent SQL injection through regconfig casting. Performance ConsiderationsExcellent Optimizations
Potential ConcernsMemory usage on very large payloads (10K+ reports). Consider documenting maximum recommended batch size. SecurityGood Practices
RecommendationsVerify error messages do not leak information about groups users do not have access to. Test CoverageExcellent CoverageTests for creation, updates, deduplication, and bulk indexing matching signal-based indexing. Missing Test Cases
DocumentationNeed to document:
RecommendationsHigh Priority
Medium Priority
Overall AssessmentStatus: Approve with minor changes This is a well-implemented feature with good attention to edge cases, performance, and security. The code follows Django and DRF best practices with comprehensive tests. Code Quality: 8.5/10 |
Pull Request Review: Bulk Upsert EnhancementsThank you for this comprehensive PR! This adds important functionality for bulk report operations with deduplication and async indexing. Overall the implementation is solid, but I've identified several areas that need attention. 🔴 Critical Issues1. SQL Injection Risk in Bulk IndexingLocation: The raw SQL query uses Recommendation:
2. Race Condition in Deduplication LogicLocation: The deduplication happens in memory after validation but before the atomic transaction. If two bulk upsert requests arrive concurrently with the same document_ids, both could pass validation and attempt to create/update, potentially causing database constraint violations. Recommendation:
3. Group Permission Bypass VulnerabilityLocation: The group filtering is applied in the serializer's Issue: In Recommendation:
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@radis/pgsearch/tasks.py`:
- Around line 18-25: The current enqueue_bulk_index_reports function aborts the
whole batch if any report_id fails conversion; change it to validate per-item
instead: iterate over report_ids, attempt int(report_id) for each inside a
try/except, append successfully converted ids to payload (or valid_ids) and
logger.warning/error the specific invalid value on exception, and after the loop
return None if payload is empty else continue with the original enqueue logic;
update references to payload (or rename to valid_ids) and remove the single
try/except around the whole list so valid IDs are not dropped.
| def enqueue_bulk_index_reports(report_ids: list[int]) -> int | None: | ||
| if not report_ids: | ||
| return None | ||
| try: | ||
| payload: list[int] = [int(report_id) for report_id in report_ids] | ||
| except (TypeError, ValueError) as exc: | ||
| logger.error("Invalid report_id in bulk index request: %s", exc) | ||
| return None |
There was a problem hiding this comment.
Don’t drop valid IDs when one entry is invalid.
Right now a single bad report_id prevents the entire batch from enqueuing, which can leave valid reports unindexed. Consider per-item validation (skip/log invalids) or explicitly raise so the caller can retry.
✅ Suggested fix (skip invalid IDs, keep valid ones)
def enqueue_bulk_index_reports(report_ids: list[int]) -> int | None:
if not report_ids:
return None
- try:
- payload: list[int] = [int(report_id) for report_id in report_ids]
- except (TypeError, ValueError) as exc:
- logger.error("Invalid report_id in bulk index request: %s", exc)
- return None
+ payload: list[int] = []
+ for report_id in report_ids:
+ try:
+ payload.append(int(report_id))
+ except (TypeError, ValueError):
+ logger.exception(
+ "Invalid report_id in bulk index request: %r",
+ report_id,
+ )
+ if not payload:
+ return None
return app.configure_task(
"radis.pgsearch.tasks.bulk_index_reports",
allow_unknown=False,
).defer(report_ids=payload)🧰 Tools
🪛 Ruff (0.14.14)
24-24: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In `@radis/pgsearch/tasks.py` around lines 18 - 25, The current
enqueue_bulk_index_reports function aborts the whole batch if any report_id
fails conversion; change it to validate per-item instead: iterate over
report_ids, attempt int(report_id) for each inside a try/except, append
successfully converted ids to payload (or valid_ids) and logger.warning/error
the specific invalid value on exception, and after the loop return None if
payload is empty else continue with the original enqueue logic; update
references to payload (or rename to valid_ids) and remove the single try/except
around the whole list so valid IDs are not dropped.
Pull Request Review: Bulk Upsert ImplementationSummaryThis PR implements bulk report upsert functionality with deduplication, group access control, and background indexing. Overall, the implementation is solid with good test coverage. Here are my findings: ✅ StrengthsCode Quality
Performance Considerations
Issues & Concerns1. Security: Group Access Control (MEDIUM)Location: radis/reports/api/serializers.py:54-62 The group queryset restriction is good for preventing privilege escalation, but there's a potential issue - this only restricts group selection in the serializer, but the bulk upsert validation happens per-item. If a user is not a superuser, they should not be able to assign reports to groups they don't belong to. Recommendation: Add explicit validation in bulk_upsert to verify all specified groups are in the user's accessible groups before processing. 2. Bug: Missing Error Handling for Language Lookup (HIGH)Location: radis/reports/api/viewsets.py:145 If a language code is in validated_reports but somehow missing from language_by_code after bulk_create (e.g., due to race conditions or database constraints), this will raise a KeyError inside a transaction, potentially rolling back the entire batch. Recommendation: Add defensive handling with .get() instead of direct dictionary access. 3. Performance: Inefficient Report Re-fetching (MEDIUM)Location: radis/reports/api/viewsets.py:251-258 After bulk operations, the code re-fetches all reports from the database to pass to handlers. For large batches (e.g., 10,000 reports), this creates significant database load. Recommendation: Consider passing report IDs to handlers instead of full objects if possible, or cache the created/updated report objects during the bulk operation. 4. Bug: SQL Injection Risk in Raw Query (HIGH)Location: radis/pgsearch/utils/indexing.py:66-74 While using parameterized queries, the %s::regconfig cast could be problematic if config contains malicious input. The code_to_language() function should be audited to ensure it only returns safe PostgreSQL regconfig names. Recommendation: Validate that config matches a whitelist of known PostgreSQL text search configurations before executing the query. 5. Code Quality: Duplicate Logic (LOW)Location: radis/reports/api/viewsets.py:53-83 The deduplication functions _dedupe_by_key and _dedupe_metadata have nearly identical logic. Recommendation: Consider consolidating into a single function that takes a key extractor function. 6. Missing: Input Validation (MEDIUM)Location: radis/reports/api/viewsets.py:347-359 No validation for maximum batch size. A malicious or misconfigured client could send 1M reports in a single request, causing memory exhaustion or timeout. Recommendation: Add a configurable maximum batch size (e.g., MAX_BULK_UPSERT_SIZE = 10000). 7. Missing: Timeout Configuration (LOW)Location: radis-client/radis_client/client.py:106-126 While timeout parameter was added, there's no guidance on appropriate values for bulk operations. Large batches may legitimately take minutes. Recommendation: Add docstring guidance about timeout values for large batches. Test CoverageGood Coverage:
Missing Tests:
Recommendation: Add tests for the missing scenarios, especially permission enforcement. Security ConsiderationsPositive:
Concerns:
Recommendations SummaryMust Fix (Before Merge):
Should Fix (Soon):
Nice to Have:
Overall AssessmentVerdict: Approve with conditions This is a well-implemented feature with good separation of concerns and performance optimizations. The main concerns are around security (group permissions, batch size limits) and defensive error handling. Once the "Must Fix" items are addressed, this will be production-ready. The deduplication logic is particularly well-thought-out and handles edge cases gracefully. The configurable indexing settings show good production awareness. Great work overall! |
PR Review: Bulk Upsert ImplementationSummaryThis PR implements bulk upsert functionality for reports with deduplication, background indexing, and improved group access control. The implementation is well-structured and follows Django best practices. ✅ StrengthsCode Quality
Architecture
Testing
🔍 Areas for Improvement1. Security - Group Access Control (radis/reports/api/serializers.py:54-62)Issue: The group restriction logic has a subtle security concern. if request.user.is_superuser:
groups_field.queryset = groups_field.queryset.all()
else:
groups_field.queryset = request.user.groups.all()Problem: If Recommendation: from django.contrib.auth.models import Group
if request.user.is_superuser:
groups_field.queryset = Group.objects.all()
else:
groups_field.queryset = request.user.groups.all()2. Performance - N+1 Query Pattern (radis/reports/api/viewsets.py:250-258)Issue: The def on_commit():
if created_ids:
created_reports = list(Report.objects.filter(document_id__in=created_ids))Problem: We already have the report objects in memory ( Impact: For bulk operations with 1000s of reports, this adds unnecessary database load. Recommendation: Pass the already-loaded report objects to handlers or consider if the refetch is necessary for data consistency. 3. Error Handling - Silent Truncation (radis/reports/api/viewsets.py:406-408)Issue: Error responses are silently truncated without clear indication. max_errors = 50
response_body["errors"] = errors[:max_errors]
response_body["errors_truncated"] = len(errors) > max_errorsProblem: While Recommendation: response_body["errors"] = errors[:max_errors]
response_body["total_errors"] = len(errors)
response_body["errors_truncated"] = len(errors) > max_errors4. Data Integrity - Missing Validation (radis/reports/api/viewsets.py:354-359)Issue: The PR enforces if not replace:
return Response(
{"detail": "replace=false is not supported for bulk upsert. Use replace=true."},
status=status.HTTP_400_BAD_REQUEST,
)Concern: This breaks the API contract if clients were relying on Recommendation:
5. Potential Race Condition (radis/reports/api/viewsets.py:119-120, 167-176)Issue: Between reading existing reports and bulk updating, data could change. existing_reports = Report.objects.filter(document_id__in=document_ids)
# ... later ...
with transaction.atomic():
if updated_reports:
Report.objects.bulk_update(updated_reports, ...)Problem: If another process modifies a report between line 119 and the transaction, those changes will be silently overwritten. Mitigation: While this is inherent to the bulk upsert pattern, consider:
6. Indexing - Missing Error Handling (radis/pgsearch/tasks.py:19-30)Issue: def enqueue_bulk_index_reports(report_ids: list[int]) -> int | None:
# ...
except (TypeError, ValueError) as exc:
logger.error("Invalid report_id in bulk index request: %s", exc)
return NoneProblem: The caller in viewsets.py doesn't check if the task was successfully enqueued. Recommendation: Consider raising an exception or at least adding a warning at the call site if 7. Configuration - Docker Compose (docker-compose.prod.yml:79)Minor: The CA bundle configuration is good, but the fallback might not work as expected. - ${RADIS_LLM_CA_BUNDLE:-/etc/ssl/certs/ca-certificates.crt}:/etc/ssl/certs/radis-ca-bundle.pem:roIssue: If Recommendation: Document this requirement or provide a more robust default. 🧪 Test Coverage GapsMissing Test Cases:
Suggested Additional Tests:@pytest.mark.django_db
def test_bulk_upsert_respects_user_group_permissions(client):
"""Verify non-superusers can only assign their own groups"""
@pytest.mark.django_db
def test_bulk_upsert_handles_large_batches(client):
"""Test with 5000+ reports to verify chunking works"""
@pytest.mark.django_db
def test_bulk_index_enqueued_asynchronously(client, settings):
"""Verify background indexing is enqueued when sync=False"""🎯 Performance ConsiderationsPositive:
Watch Out For:
🔐 Security AssessmentGood:
Concerns:
Recommendation: Consider adding: MAX_BULK_UPSERT_SIZE = 10000 # in settings
if len(request.data) > MAX_BULK_UPSERT_SIZE:
return Response({"detail": "Bulk upload too large"}, status=413)📝 DocumentationWhat's Good:
What's Missing:
✨ Minor Suggestions
🎉 ConclusionThis is a solid implementation with good architecture and reasonable test coverage. The main concerns are:
Recommendation: Approve with minor revisions. The issues identified are not blockers but should be addressed before merge. Priority Fixes:
Great work on implementing a complex feature with proper transaction handling and background processing! 🚀 |
Summary by CodeRabbit
Bug Fixes
New Features
Documentation / Chores
Tests
✏️ Tip: You can customize this high-level summary in your review settings.