Skip to content

[fix][io] Fix Solr Sink KeyValue schema unwrapping for Debezium CDC#26

Open
Praveenkumar76 wants to merge 5 commits into
apache:masterfrom
cognitree:fix/solr-debezium-unwrap
Open

[fix][io] Fix Solr Sink KeyValue schema unwrapping for Debezium CDC#26
Praveenkumar76 wants to merge 5 commits into
apache:masterfrom
cognitree:fix/solr-debezium-unwrap

Conversation

@Praveenkumar76

Copy link
Copy Markdown

Fixes apache#23763

Motivation

Solr sink indexing remains empty when consuming PostgreSQL CDC events produced by the Debezium connector.

The root cause is a schema and structural incompatibility between Debezium-generated messages and the existing Solr sink implementation:

  • KeyValue schema handling: Debezium emits CDC events using a KeyValue schema wrapper. The existing GenericRecord processing could not correctly extract nested payload fields from this structure.
  • Envelope structure: Debezium wraps events inside a CDC envelope containing fields such as before, after, op, and source. The sink previously lacked logic to unwrap and process the actual row state.
  • Type mismatch: Solr expects consistent string-based values for identifiers and indexed fields, while CDC events often contain numeric types (Integer, Long), causing silent field drops during indexing.
  • Record lifecycle handling: Conversion failures and intentionally skipped records (such as processed DELETE events) were not clearly differentiated, potentially leading to silent data loss.

Additionally, while CDC transformation can be handled externally using Pulsar Functions or intermediate processors, native support within the Solr sink simplifies standard CDC-to-search indexing pipelines.

Modifications

Configuration

  • Added unwrapDebeziumRecord configuration in SolrSinkConfig (default: false) to allow users to explicitly enable Debezium CDC payload handling.

SolrAbstractSink

Updated the parent sink lifecycle handling:

  • Successful processing or intentionally skipped records now trigger record.ack()
  • Processing failures now trigger record.fail() to support retries and DLQ handling
  • Prevents silent data loss during conversion failures

SolrGenericRecordSink

Implemented native Debezium CDC support:

  • Added extractValueRecord() using getNativeObject() for safe extraction of KeyValue payloads
  • Added isDebeziumEnvelope() to identify valid Debezium CDC envelopes using op and state fields (before / after)
  • Added mapDebeziumPayload() and extractAfterRecord() to recursively unwrap CDC payloads
  • Added explicit DELETE handling:
    • Detects delete events (after == null)
    • Extracts primary key from before
    • Calls deleteById in Solr
  • Added normalizeValue() to safely convert numeric values into Strings for Solr schema compatibility

Refactoring

  • Simplified control flow using guard clauses
  • Reduced nested branching for improved readability and maintainability

Highlight of changes:

  • Native Debezium CDC payload support in Solr sink
  • End-to-end DELETE event propagation from PostgreSQL to Solr
  • Improved data integrity through proper ack() / fail() lifecycle handling

Verifying this change

Verified end-to-end using:

  • Apache Pulsar 4.x standalone
  • Debezium PostgreSQL Connector
  • Solr 9.x
  • PostgreSQL CDC events

Validation results:

  • INSERT events correctly populate Solr documents
  • UPDATE events correctly update indexed fields
  • DELETE events correctly remove documents from Solr
  • Numeric fields are normalized and indexed successfully

Example CDC operations validated:

  • "op":"c" → INSERT
  • "op":"u" → UPDATE
  • "op":"d" → DELETE

Tests added

  • testDebeziumUnwrapEnvelope
    Validates handling of standard Debezium envelope payloads

  • testDebeziumUnwrapFlatValue
    Ensures compatibility with non-envelope payloads

  • testDebeziumUnwrapDelete
    Verifies DELETE event handling and Solr deletion flow

  • Updated mocks to support:

    • KeyValue schema handling
    • Debezium op field detection
  • Confirmed no regression for non-CDC Pulsar topics

Does this pull request potentially affect one of the following parts:

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Solr sink indexing is empty after PostgreSQL CDC changes

1 participant