diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index e9ebed2826f4..fdaf6d261d7b 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -584,7 +584,6 @@ public Optional visit( @Override public Optional visit( LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - FieldVector vector = arrowField.createVector(rootAlloc); int bitWidth = intLogicalType.getBitWidth(); if (bitWidth == 8 || bitWidth == 16 || bitWidth == 32) { @@ -594,12 +593,26 @@ public Optional visit( // BaseParquetReaders for the non-vectorized path. Preconditions.checkArgument( intLogicalType.isSigned() || bitWidth < 32, "Cannot read UINT32 as an int value"); + Field intField = + new Field( + icebergField.name(), + new FieldType( + icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null), + null); + FieldVector vector = intField.createVector(rootAlloc); ((IntVector) vector).allocateNew(batchSize); return Optional.of( new LogicalTypeVisitorResult(vector, ReadType.INT, (int) IntVector.TYPE_WIDTH)); } else if (bitWidth == 64) { Preconditions.checkArgument( intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); + Field longField = + new Field( + icebergField.name(), + new FieldType( + icebergField.isOptional(), new ArrowType.Int(Long.SIZE, true), null, null), + null); + FieldVector vector = longField.createVector(rootAlloc); ((BigIntVector) vector).allocateNew(batchSize); return Optional.of( new LogicalTypeVisitorResult(vector, ReadType.LONG, (int) BigIntVector.TYPE_WIDTH)); diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index cf3eb2700265..e23006c79d18 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -436,6 +436,245 @@ public void testUnsignedSmallIntegerColumnRoundtrips(int unsignedBitWidth, int v assertThat(totalRows).isEqualTo(1); } + /** + * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet + * file has an INT(32, true) logical type annotation. This reproduces a bug where reading a file + * written with INT(32) logical type after an ALTER TABLE promoting the column from int to long + * causes a ClassCastException (BigIntVector cannot be cast to IntVector). + * + *

The vector remains an IntVector (matching the physical storage), and the accessor handles + * widening to long on read. + */ + @Test + public void testIntToLongPromotionWithLogicalType() throws Exception { + tables = new HadoopTables(); + Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-logical"); + + // Write a Parquet file with INT(32, signed) logical type annotation. + // This is what non-Iceberg writers (PyArrow, Spark native, etc.) typically produce. + MessageType parquetSchema = + new MessageType( + "test", + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.intType(32, true)) + .id(1) + .named("col")); + + File testFile = new File(tempDir, "int-logical-type-promotion.parquet"); + List values = ImmutableList.of(1, 2, 3, Integer.MAX_VALUE); + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(testFile.toURI())).withType(parquetSchema).build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + for (int val : values) { + Group group = factory.newGroup(); + group.add("col", val); + writer.write(group); + } + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(values.size()) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + // Promote the column type from int to long (simulates ALTER TABLE) + table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-logical"); + + // Read with the vectorized reader — the underlying vector is IntVector (physical type), + // but the accessor correctly widens to long when getLong() is called. + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + assertThat(batch.numRows()).isEqualTo(values.size()); + + FieldVector vector = batch.column(0).getArrowVector(); + assertThat(vector) + .as("Vector should be IntVector matching the physical Parquet type") + .isInstanceOf(IntVector.class); + + for (int i = 0; i < batch.numRows(); i++) { + assertThat(batch.column(0).getLong(i)) + .as("Accessor should widen int to long for row %d", i) + .isEqualTo((long) values.get(i)); + } + } + } + } + + /** + * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet + * file has a bare INT32 without any logical type annotation. + * + *

The vector remains an IntVector (matching the physical storage), and the accessor handles + * widening to long on read. + */ + @Test + public void testIntToLongPromotionWithoutLogicalType() throws Exception { + tables = new HadoopTables(); + Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-no-logical"); + + // Write via Iceberg's writer which produces bare INT32 (no logical type annotation) + List records = Lists.newArrayList(); + int[] values = new int[] {1, 2, 3, Integer.MAX_VALUE}; + for (int val : values) { + GenericRecord rec = GenericRecord.create(schema); + rec.setField("col", val); + records.add(rec); + } + + File testFile = new File(tempDir, "int-no-logical-type-promotion.parquet"); + FileAppender appender = + Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .build(); + try { + appender.addAll(records); + } finally { + appender.close(); + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(records.size()) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + // Promote the column type from int to long + table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-no-logical"); + + // Read with the vectorized reader — the underlying vector is IntVector (physical type), + // but the accessor correctly widens to long when getLong() is called. + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + assertThat(batch.numRows()).isEqualTo(values.length); + + FieldVector vector = batch.column(0).getArrowVector(); + assertThat(vector) + .as("Vector should be IntVector matching the physical Parquet type") + .isInstanceOf(IntVector.class); + + for (int i = 0; i < batch.numRows(); i++) { + assertThat(batch.column(0).getLong(i)) + .as("Accessor should widen int to long for row %d", i) + .isEqualTo((long) values[i]); + } + } + } + } + + /** + * Tests that int-to-long promotion works correctly when values larger than Integer.MAX_VALUE are + * written after the promotion and reuseContainers is true. This verifies that reading a mix of + * pre-promotion (int-range) and post-promotion (long-range) files works correctly. + */ + @Test + public void testIntToLongPromotionWithLargeValuesAndReuseContainers() throws Exception { + tables = new HadoopTables(); + Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-large-values"); + + // Write a Parquet file with INT(32, signed) logical type (pre-promotion data) + MessageType parquetSchema = + new MessageType( + "test", + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.intType(32, true)) + .id(1) + .named("col")); + + File prePromotionFile = new File(tempDir, "pre-promotion.parquet"); + List intValues = ImmutableList.of(1, 2, Integer.MAX_VALUE); + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(prePromotionFile.toURI())) + .withType(parquetSchema) + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + for (int val : intValues) { + Group group = factory.newGroup(); + group.add("col", val); + writer.write(group); + } + } + + DataFile prePromotionDataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(prePromotionFile.getAbsolutePath()) + .withFileSizeInBytes(prePromotionFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(intValues.size()) + .build(); + table.newAppend().appendFile(prePromotionDataFile).commit(); + + // Promote the column type from int to long + table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-large-values"); + + // Write a second file with long values > Integer.MAX_VALUE (post-promotion data) + List longRecords = Lists.newArrayList(); + long[] longValues = new long[] {(long) Integer.MAX_VALUE + 1L, Long.MAX_VALUE}; + for (long val : longValues) { + GenericRecord rec = GenericRecord.create(table.schema()); + rec.setField("col", val); + longRecords.add(rec); + } + + File postPromotionFile = new File(tempDir, "post-promotion.parquet"); + FileAppender appender = + Parquet.write(Files.localOutput(postPromotionFile)) + .schema(table.schema()) + .createWriterFunc(GenericParquetWriter::create) + .build(); + try { + appender.addAll(longRecords); + } finally { + appender.close(); + } + + DataFile postPromotionDataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(postPromotionFile.getAbsolutePath()) + .withFileSizeInBytes(postPromotionFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(longRecords.size()) + .build(); + table.newAppend().appendFile(postPromotionDataFile).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-large-values"); + + // Read with reuseContainers=true and validate all values + List allExpectedValues = Lists.newArrayList(); + for (int v : intValues) { + allExpectedValues.add((long) v); + } + for (long v : longValues) { + allExpectedValues.add(v); + } + + List actualValues = Lists.newArrayList(); + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, true)) { + for (ColumnarBatch batch : vectorizedReader) { + for (int i = 0; i < batch.numRows(); i++) { + actualValues.add(batch.column(0).getLong(i)); + } + } + } + + assertThat(actualValues).containsExactlyInAnyOrderElementsOf(allExpectedValues); + } + private static Stream rejectedUnsignedIntegerCases() { return Stream.of( Arguments.of( @@ -998,11 +1237,6 @@ private void writeTable(boolean constantRecords) throws Exception { overwrite.addFile(writeParquetFile(table, records)); } overwrite.commit(); - - // Perform a type promotion - // TODO: The read Arrow vector should of type BigInt (promoted type) but it is Int (old type). - Table tableLatest = tables.load(tableLocation); - tableLatest.updateSchema().updateColumn("int_promotion", Types.LongType.get()).commit(); } private static org.apache.arrow.vector.types.pojo.Schema createExpectedArrowSchema(