Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
033f47f
feat: Implement support for VARIANT type in RecordConverter with conv…
seokyun-ha-toss Feb 10, 2026
a53af92
test: implement unit test for RecordConverter for Variant
seokyun-ha-toss Feb 10, 2026
73ea656
chore: lint spotlessApply
seokyun-ha-toss Feb 10, 2026
2b3fcb1
fix: there's no already Variant value, and use `Lists.of()`
seokyun-ha-toss Feb 11, 2026
47f6fe5
fix: handle `BigDecimal` type in `numberToVariantValue()`
seokyun-ha-toss Mar 12, 2026
e8f57f1
fix: explicit error when unknown numeric type
seokyun-ha-toss Mar 12, 2026
b57e0d8
refactor: optimize field name collection in RecordConverter to use a …
seokyun-ha-toss Mar 12, 2026
0117174
fix: ensure keys in map are non-null strings before processing in Rec…
seokyun-ha-toss Mar 12, 2026
be09349
refactor: loop entrySet once
seokyun-ha-toss Mar 12, 2026
75ec7ce
test: enhance unit tests for Variant conversion with additional cases…
seokyun-ha-toss Mar 13, 2026
c6e558a
lint: apply gh actions auto-review and lints, split Cyclomatic Comple…
seokyun-ha-toss Mar 13, 2026
787663d
lint: spotlessApply
seokyun-ha-toss Mar 13, 2026
75dc8a2
feat: support `kafka Struct` type, and align if checking ordering
seokyun-ha-toss Mar 17, 2026
d9c8721
test: implt unittest for Variant for Struct
seokyun-ha-toss Mar 17, 2026
e517884
lint: gradlew spotlessApply
seokyun-ha-toss Mar 17, 2026
8bb8740
feat: add support for BigInteger conversion to BigDecimal in RecordCo…
seokyun-ha-toss Mar 27, 2026
aa2bcce
feat: support for Date conversion to Variants
seokyun-ha-toss Mar 27, 2026
5be9fa4
feat: support additional Date, Datetime, time types
seokyun-ha-toss Mar 27, 2026
179b07a
lint: spotlessApply
seokyun-ha-toss Mar 27, 2026
c1d0be7
Fix java.util.Date variant conversion losing precision for Timestamp …
brandonstanleyappfolio Apr 1, 2026
465fa92
Merge pull request #2 from brandonstanleyappfolio/variant-date-fix
seokyun-ha-toss Apr 3, 2026
06b7afe
fix: explict throw exception when non-string key is used
seokyun-ha-toss Apr 3, 2026
a4c12b8
lint: spotlessApply
seokyun-ha-toss Apr 3, 2026
568f7b1
test: remove duplicated
seokyun-ha-toss Apr 3, 2026
10bff3d
test: integrate test convert variant value from list, map, struct
seokyun-ha-toss Apr 4, 2026
b8772ba
test: left only one unittest on `VariantValueFromStructWith` Date family
seokyun-ha-toss Apr 4, 2026
a99262e
test: add unittest for variant with timestamp family
seokyun-ha-toss Apr 4, 2026
ca47edf
Merge branch 'apache:main' into support-variant-for-sink-connector
seokyun-ha-toss Apr 4, 2026
7530597
fix: [JavaUtilDate] Date has a bad API that leads to bugs
seokyun-ha-toss Apr 4, 2026
a312eb0
fix: resolve Cyclomatic Complexity
seokyun-ha-toss Apr 4, 2026
818a515
fix: throw invalid key type
seokyun-ha-toss Apr 9, 2026
b1f379e
lint: spotlessApply
seokyun-ha-toss Apr 9, 2026
1b849b7
fix: remove key null checking & use JLS pattern variable
seokyun-ha-toss Apr 21, 2026
2412a6b
fix: remove Variant.from() ByteBuffer type, and pass-throught already…
seokyun-ha-toss Apr 21, 2026
f55df63
fix: change collect field names recursively from top-down to bottom-up
seokyun-ha-toss Apr 22, 2026
546dfca
fix: resolve pattern-matching instanceof
seokyun-ha-toss Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,29 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException;
import java.time.temporal.Temporal;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.iceberg.FileFormat;
Expand All @@ -53,6 +59,7 @@
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types.DecimalType;
Expand All @@ -64,6 +71,13 @@
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.variants.Variants;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;

Expand Down Expand Up @@ -142,6 +156,8 @@ private Object convertValue(
return convertTimeValue(value);
case TIMESTAMP:
return convertTimestampValue(value, (TimestampType) type);
case VARIANT:
return convertVariantValue(value);
}
throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
}
Expand Down Expand Up @@ -464,6 +480,234 @@ protected Temporal convertTimestampValue(Object value, TimestampType type) {
return convertLocalDateTime(value);
}

protected Variant convertVariantValue(Object value) {
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Comment thread
seokyun-ha-toss marked this conversation as resolved.
if (value instanceof Variant variant) {
return variant;
}

List<String> sortedFieldNames =
collectFieldNames(value).stream().sorted().collect(Collectors.toList());
VariantMetadata metadata = Variants.metadata(sortedFieldNames);
return Variant.of(metadata, objectToVariantValue(value, metadata, null));
}

/**
* Recursively collects field names from collections, maps, and structs. Returns an empty set for
* null, scalar values, and empty maps, lists, or structs. Map keys must be strings; non-string
* keys cause IllegalArgumentException.
*/
private static Set<String> collectFieldNames(Object value) {
if (value == null) {
return Collections.emptySet();
}
if (value instanceof Collection<?> collection) {
if (collection.isEmpty()) {
return Collections.emptySet();
}
Set<String> names = Sets.newHashSet();
collection.forEach(element -> names.addAll(collectFieldNames(element)));
return names;
} else if (value instanceof Map<?, ?> map) {
if (map.isEmpty()) {
return Collections.emptySet();
}
Set<String> names = Sets.newHashSet();
map.forEach(
(key, val) -> {
if (key instanceof String keyStr) {
names.add(keyStr);
names.addAll(collectFieldNames(val));
} else {
throw new IllegalArgumentException(
"Cannot convert map to variant: keys must be non-null strings, was: "
+ (key == null ? "null" : key.getClass().getName()));
}
});
return names;
} else if (value instanceof Struct struct) {
List<Field> fields = struct.schema().fields();
if (fields.isEmpty()) {
return Collections.emptySet();
}
Set<String> names = Sets.newHashSet();
fields.forEach(
field -> {
names.add(field.name());
names.addAll(collectFieldNames(struct.get(field)));
});
return names;
}
return Collections.emptySet();
}

/**
* Recursively converts a Java object to a VariantValue using the given shared metadata for all
* nested maps. Handles primitives, List (array), and Map (object); map keys become field names.
*/
private static VariantValue objectToVariantValue(
Object value, VariantMetadata metadata, org.apache.kafka.connect.data.Schema schema) {
if (value == null) {
return Variants.ofNull();
}
VariantValue primitive = primitiveToVariantValue(value, schema);
if (primitive != null) {
return primitive;
}
if (value instanceof Collection<?> collection) {
ValueArray array = Variants.array();
org.apache.kafka.connect.data.Schema elementSchema =
schema != null ? schema.valueSchema() : null;
for (Object element : collection) {
array.add(objectToVariantValue(element, metadata, elementSchema));
}
return array;
}
if (value instanceof Map<?, ?> map) {
return mapToVariantValue(map, metadata, schema);
}
if (value instanceof Struct struct) {
ShreddedObject object = Variants.object(metadata);
for (Field field : struct.schema().fields()) {
object.put(field.name(), objectToVariantValue(struct.get(field), metadata, field.schema()));
}
return object;
}
throw new IllegalArgumentException("Cannot convert to variant: " + value.getClass().getName());
}

/** Converts a Map to VariantValue; throw IllegalArgumentException if the key is not a string. */
private static VariantValue mapToVariantValue(
Map<?, ?> map, VariantMetadata metadata, org.apache.kafka.connect.data.Schema schema) {
ShreddedObject object = Variants.object(metadata);
org.apache.kafka.connect.data.Schema mapValueSchema =
schema != null ? schema.valueSchema() : null;
map.forEach(
(key, val) -> {
if (key instanceof String keyStr) {
object.put(keyStr, objectToVariantValue(val, metadata, mapValueSchema));
} else {
throw new IllegalArgumentException(
"Cannot convert map to variant: keys must be non-null strings, was: "
+ (key == null ? "null" : key.getClass().getName()));
}
});
return object;
}

/**
* Converts a primitive or primitive-like value to VariantValue; returns null if not supported.
* The optional schema is used to disambiguate java.util.Date which Kafka Connect uses for Date,
* Time, and Timestamp logical types.
*/
private static VariantValue primitiveToVariantValue(
Object value, org.apache.kafka.connect.data.Schema schema) {
if (value instanceof Boolean booleanValue) {
return Variants.of(booleanValue);
}
VariantValue temporal = temporalObjectToVariantValue(value, schema);
if (temporal != null) {
return temporal;
}
if (value instanceof Number number) {
return numberToVariantValue(number);
}
if (value instanceof String stringValue) {
return Variants.of(stringValue);
}
if (value instanceof ByteBuffer byteBuffer) {
return Variants.of(byteBuffer);
}
if (value instanceof byte[] byteArray) {
return Variants.of(ByteBuffer.wrap(byteArray));
}
if (value instanceof UUID uuid) {
return Variants.ofUUID(uuid);
}
return null;
}

/**
* Converts java.time values and java.util.Date (with Connect logical type from the optional
* schema) to VariantValue; returns null if the value is not a supported temporal representation.
*/
private static VariantValue temporalObjectToVariantValue(
Object value, org.apache.kafka.connect.data.Schema schema) {
if (value instanceof Instant instant) {
return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant(instant));
}
if (value instanceof OffsetDateTime offsetDateTime) {
return Variants.ofTimestamptz(DateTimeUtil.microsFromTimestamptz(offsetDateTime));
}
if (value instanceof ZonedDateTime zonedDateTime) {
return Variants.ofTimestamptz(
DateTimeUtil.microsFromTimestamptz(zonedDateTime.toOffsetDateTime()));
}
if (value instanceof LocalDateTime localDateTime) {
return Variants.ofTimestampntz(DateTimeUtil.microsFromTimestamp(localDateTime));
}
if (value instanceof LocalDate localDate) {
return Variants.ofDate(DateTimeUtil.daysFromDate(localDate));
}
if (value instanceof LocalTime localTime) {
return Variants.ofTime(DateTimeUtil.microsFromTime(localTime));
}
if (value instanceof Date date) {
String logicalName = schema != null ? schema.name() : null;
// Connect represents Timestamp, Time, and Date logical types as java.util.Date at runtime;
// normalize to Instant once, then interpret using the schema logical type name.
Instant connectInstant = date.toInstant();
if (org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME.equals(logicalName)) {
return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant(connectInstant));
}
if (org.apache.kafka.connect.data.Time.LOGICAL_NAME.equals(logicalName)) {
LocalTime utcTime = connectInstant.atZone(ZoneOffset.UTC).toLocalTime();
return Variants.ofTime(DateTimeUtil.microsFromTime(utcTime));
}
if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(logicalName)) {
return Variants.ofDate(DateTimeUtil.daysFromInstant(connectInstant));
}
throw new IllegalArgumentException(
"Cannot convert java.util.Date to variant without a recognized logical type schema"
+ " (expected Timestamp, Time, or Date but got: "
+ logicalName
+ ")");
}
return null;
}

/**
* Converts a Number to VariantValue; throw IllegalArgumentException if the value is not a
* supported number representation.
*/
private static VariantValue numberToVariantValue(Number number) {
if (number instanceof BigDecimal bigDecimal) {
return Variants.of(bigDecimal);
}
if (number instanceof BigInteger bigInteger) {
return Variants.of(new BigDecimal(bigInteger));
}
if (number instanceof Integer integer) {
return Variants.of(integer);
}
if (number instanceof Long longValue) {
return Variants.of(longValue);
}
if (number instanceof Float floatValue) {
return Variants.of(floatValue);
}
if (number instanceof Double doubleValue) {
return Variants.of(doubleValue);
}
if (number instanceof Byte byteValue) {
return Variants.of(byteValue);
}
if (number instanceof Short shortValue) {
return Variants.of(shortValue);
}
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Comment thread
seokyun-ha-toss marked this conversation as resolved.
throw new IllegalArgumentException(
"Cannot convert Number to variant (unknown type): " + number.getClass().getName());
}

@SuppressWarnings("JavaUtilDate")
private OffsetDateTime convertOffsetDateTime(Object value) {
if (value instanceof Number) {
Expand Down
Loading
Loading