Issue
The Pulsar Kafka Connect adaptor passes the full configuration map to converters, but converter specific properties, like JSON Converter's key.converter.schemas.cache.size for example, are ignored because the prefix key.converter. is not stripped before configuring converter.
Expected Behavior (Kafka Connect)
Kafka Connect Worker extracts converter specific config by stripping the prefix before calling the configure() method (code link):
String configPrefix = classPropertyName + "."; // "key.converter." or "value.converter."
Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix); // get configs with the prefix and ALSO STRIP THE PREFIX
...
plugin.configure(converterConfig, isKeyConverter); // passing striped config into configure() method
Input: {"key.converter.schemas.cache.size": 100}
After stripping: {"schemas.cache.size": 100}
Result: Converter sees schemas.cache.size property and uses it
Current Behavior (Pulsar Adaptor)
The Pulsar Adaptor just passes the whole config into the configure() method (code link) without stripping the prefix.
Input: {"key.converter.schemas.cache.size": 100}
No stripping: Full map passed to converter
Result: Converter looks for schemas.cache.size property, doesn't find it, uses default
Fix
Stripping the prefix can be done quite easily inside the open() method (code link):
PulsarKafkaWorkerConfig pulsarKafkaWorkerConfig = new PulsarKafkaWorkerConfig(stringConfig);
Map<String, Object> keyConverterConfig = pulsarKafkaWorkerConfig.originalsWithPrefix(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG + ".");
Map<String, Object> valueConverterConfig = pulsarKafkaWorkerConfig.originalsWithPrefix(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG + ".");
Implementation questions
I am happy to prepare fix, but I would need clarification on two questions:
1. Mock Schema Registry URL
The fix would remove the need for this hardcoded mock config (code link).
I believe this was added because schema.registry.url is required by AvroConverter and initialization was failing without it. With the fix, this config would be correctly extracted from the prefixed value.converter.schema.registry.url property (standard Kafka Connect configuration).
However, removing this hardcoded value is a breaking change. Users relying on the current behavior would need to explicitly configure value.converter.schema.registry.url when using AvroConverter.
Question: Is this breaking change acceptable? If not, we can keep the configuration overwrite to be applied to the stripped config map, even though it's useless after the fix.
2. json-with-envelope Configuration Behavior
The fix would also change how json-with-envelope works (code link).
Current behavior: Sets top level schemas.enable = true/false, which (due to the bug) was the only way to control schema behavior for JSON converters since prefixed properties were ignored.
After fix: Prefixed properties like key.converter.schemas.enable would be use instead of the top level override and thus introducing breaking change.
I don't fully understand the original motivation behind this override. But if the motivation was to introduce some level of control over the schemas.enable property, than it is not needed anymore after this fix. Users could control schema behavior via standard key.converter.schemas.enable and value.converter.schemas.enable properties. Question 2A: Would it be acceptable to stop overriding schemas.enable in this fix?
Follow up suggestion: If we stop overriding converter configuration, json-with-envelope would simply control whether the output record uses Schema.BYTES (schemaless) or preserves record's schema. I actually think this is a useful capability and would appreciate if it could be configured more granularly for key and value separately.
For example, introducing:
key-without-schema (boolean) - force key to Schema.BYTES regardless of converter
value-without-schema (boolean) - force value to Schema.BYTES regardless of converter
These properties would work with any converter (not just JSON) and provide finer control over the Pulsar output schema.
Question 2B: Would it be acceptable to introduce these new properties and mark json-with-envelope as deprecated in this fix?
Because of the breaking changes, it may be best to include this fix in a new major version release. This would avoid the confusing behavior that could result from mixing the legacy schemas.enable override with the corrected prefix based converter configuration. What do you think?
Related issue #30
Issue
The Pulsar Kafka Connect adaptor passes the full configuration map to converters, but converter specific properties, like JSON Converter's
key.converter.schemas.cache.sizefor example, are ignored because the prefixkey.converter.is not stripped before configuring converter.Expected Behavior (Kafka Connect)
Kafka Connect Worker extracts converter specific config by stripping the prefix before calling the
configure()method (code link):Input:
{"key.converter.schemas.cache.size": 100}After stripping:
{"schemas.cache.size": 100}Result: Converter sees
schemas.cache.sizeproperty and uses itCurrent Behavior (Pulsar Adaptor)
The Pulsar Adaptor just passes the whole config into the
configure()method (code link) without stripping the prefix.Input:
{"key.converter.schemas.cache.size": 100}No stripping: Full map passed to converter
Result: Converter looks for
schemas.cache.sizeproperty, doesn't find it, uses defaultFix
Stripping the prefix can be done quite easily inside the
open()method (code link):Implementation questions
I am happy to prepare fix, but I would need clarification on two questions:
1. Mock Schema Registry URL
The fix would remove the need for this hardcoded
mockconfig (code link).I believe this was added because
schema.registry.urlis required by AvroConverter and initialization was failing without it. With the fix, this config would be correctly extracted from the prefixedvalue.converter.schema.registry.urlproperty (standard Kafka Connect configuration).However, removing this hardcoded value is a breaking change. Users relying on the current behavior would need to explicitly configure
value.converter.schema.registry.urlwhen using AvroConverter.Question: Is this breaking change acceptable? If not, we can keep the configuration overwrite to be applied to the stripped config map, even though it's useless after the fix.
2.
json-with-envelopeConfiguration BehaviorThe fix would also change how
json-with-envelopeworks (code link).Current behavior: Sets top level
schemas.enable = true/false, which (due to the bug) was the only way to control schema behavior for JSON converters since prefixed properties were ignored.After fix: Prefixed properties like
key.converter.schemas.enablewould be use instead of the top level override and thus introducing breaking change.I don't fully understand the original motivation behind this override. But if the motivation was to introduce some level of control over the
schemas.enableproperty, than it is not needed anymore after this fix. Users could control schema behavior via standardkey.converter.schemas.enableandvalue.converter.schemas.enableproperties. Question 2A: Would it be acceptable to stop overridingschemas.enablein this fix?Follow up suggestion: If we stop overriding converter configuration,
json-with-envelopewould simply control whether the output record uses Schema.BYTES (schemaless) or preserves record's schema. I actually think this is a useful capability and would appreciate if it could be configured more granularly for key and value separately.For example, introducing:
key-without-schema(boolean) - force key to Schema.BYTES regardless of convertervalue-without-schema(boolean) - force value to Schema.BYTES regardless of converterThese properties would work with any converter (not just JSON) and provide finer control over the Pulsar output schema.
Question 2B: Would it be acceptable to introduce these new properties and mark
json-with-envelopeas deprecated in this fix?Because of the breaking changes, it may be best to include this fix in a new major version release. This would avoid the confusing behavior that could result from mixing the legacy
schemas.enableoverride with the corrected prefix based converter configuration. What do you think?Related issue #30