From d2d8dd5bc0795291c8a34c2c9f6d5e040d20cccf Mon Sep 17 00:00:00 2001 From: Xiening Dai Date: Thu, 14 May 2026 22:19:06 +0000 Subject: [PATCH 1/2] Arrow: Fix ClassCastException in vectorized reader on int-to-long promotion with INT logical type Fix ClassCastException: BigIntVector cannot be cast to IntVector when reading Parquet files with INT(32, true) logical type annotation after promoting a column from int to long. The vectorized reader's LogicalTypeVisitor now allocates vectors based on the Parquet physical type instead of deriving them from the (potentially promoted) Iceberg schema type. Root Cause: In VectorizedArrowReader.allocateFieldVector(), the Arrow field was created from the Iceberg schema type (which reflects the promoted LongType), producing a BigIntVector. The LogicalTypeVisitor then cast this vector to IntVector based on the Parquet file's INT(32) logical type, causing the mismatch. The non-vectorized reader (BaseParquetReaders) already handles this correctly by checking the expected Iceberg type and using IntAsLongReader for promotion. The vectorized reader relies on the accessor layer for widening (IntAccessor.getLong() widens int to long), so the fix ensures the vector matches the physical data layout. Tests: - testIntToLongPromotionWithLogicalType: verifies reading after promotion when file has INT(32, true) annotation (the reported crash) - testIntToLongPromotionWithoutLogicalType: verifies reading after promotion when file has bare INT32 --- .../vectorized/VectorizedArrowReader.java | 15 +- .../arrow/vectorized/TestArrowReader.java | 151 ++++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index e9ebed2826f4..fdaf6d261d7b 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -584,7 +584,6 @@ public Optional visit( @Override public Optional visit( LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { - FieldVector vector = arrowField.createVector(rootAlloc); int bitWidth = intLogicalType.getBitWidth(); if (bitWidth == 8 || bitWidth == 16 || bitWidth == 32) { @@ -594,12 +593,26 @@ public Optional visit( // BaseParquetReaders for the non-vectorized path. Preconditions.checkArgument( intLogicalType.isSigned() || bitWidth < 32, "Cannot read UINT32 as an int value"); + Field intField = + new Field( + icebergField.name(), + new FieldType( + icebergField.isOptional(), new ArrowType.Int(Integer.SIZE, true), null, null), + null); + FieldVector vector = intField.createVector(rootAlloc); ((IntVector) vector).allocateNew(batchSize); return Optional.of( new LogicalTypeVisitorResult(vector, ReadType.INT, (int) IntVector.TYPE_WIDTH)); } else if (bitWidth == 64) { Preconditions.checkArgument( intLogicalType.isSigned(), "Cannot read UINT64 as a long value"); + Field longField = + new Field( + icebergField.name(), + new FieldType( + icebergField.isOptional(), new ArrowType.Int(Long.SIZE, true), null, null), + null); + FieldVector vector = longField.createVector(rootAlloc); ((BigIntVector) vector).allocateNew(batchSize); return Optional.of( new LogicalTypeVisitorResult(vector, ReadType.LONG, (int) BigIntVector.TYPE_WIDTH)); diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index cf3eb2700265..9a003e52dcff 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -436,6 +436,157 @@ public void testUnsignedSmallIntegerColumnRoundtrips(int unsignedBitWidth, int v assertThat(totalRows).isEqualTo(1); } + /** + * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet + * file has an INT(32, true) logical type annotation. This reproduces a bug where reading a file + * written with INT(32) logical type after an ALTER TABLE promoting the column from int to long + * causes a ClassCastException (BigIntVector cannot be cast to IntVector). + * + *

The vector remains an IntVector (matching the physical storage), and the accessor handles + * widening to long on read. + */ + @Test + public void testIntToLongPromotionWithLogicalType() throws Exception { + tables = new HadoopTables(); + Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-logical"); + + // Write a Parquet file with INT(32, signed) logical type annotation. + // This is what non-Iceberg writers (PyArrow, Spark native, etc.) typically produce. + MessageType parquetSchema = + new MessageType( + "test", + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.intType(32, true)) + .id(1) + .named("col")); + + File testFile = new File(tempDir, "int-logical-type-promotion.parquet"); + List values = ImmutableList.of(1, 2, 3, Integer.MAX_VALUE); + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(testFile.toURI())).withType(parquetSchema).build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + for (int val : values) { + Group group = factory.newGroup(); + group.add("col", val); + writer.write(group); + } + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(values.size()) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + // Promote the column type from int to long (simulates ALTER TABLE) + table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-logical"); + + // Read with the vectorized reader — the underlying vector is IntVector (physical type), + // but the accessor correctly widens to long when getLong() is called. + int totalRows = 0; + int rowIndex = 0; + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); + FieldVector vector = root.getVector("col"); + assertThat(vector) + .as("Vector should be IntVector matching the physical Parquet type") + .isInstanceOf(IntVector.class); + IntVector intVector = (IntVector) vector; + for (int i = 0; i < root.getRowCount(); i++) { + assertThat(intVector.get(i)) + .as("Row %d value should be read correctly", rowIndex) + .isEqualTo(values.get(rowIndex)); + rowIndex++; + } + totalRows += root.getRowCount(); + root.close(); + } + } + + assertThat(totalRows).isEqualTo(values.size()); + } + + /** + * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet + * file has a bare INT32 without any logical type annotation. + * + *

The vector remains an IntVector (matching the physical storage), and the accessor handles + * widening to long on read. + */ + @Test + public void testIntToLongPromotionWithoutLogicalType() throws Exception { + tables = new HadoopTables(); + Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-no-logical"); + + // Write via Iceberg's writer which produces bare INT32 (no logical type annotation) + List records = Lists.newArrayList(); + for (int val : new int[] {1, 2, 3, Integer.MAX_VALUE}) { + GenericRecord rec = GenericRecord.create(schema); + rec.setField("col", val); + records.add(rec); + } + + File testFile = new File(tempDir, "int-no-logical-type-promotion.parquet"); + FileAppender appender = + Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .createWriterFunc(GenericParquetWriter::create) + .build(); + try { + appender.addAll(records); + } finally { + appender.close(); + } + + DataFile dataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(testFile.getAbsolutePath()) + .withFileSizeInBytes(testFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(records.size()) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + // Promote the column type from int to long + table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-no-logical"); + + // Read with the vectorized reader — the underlying vector is IntVector (physical type), + // but the accessor correctly widens to long when getLong() is called. + int totalRows = 0; + int rowIndex = 0; + int[] expectedValues = new int[] {1, 2, 3, Integer.MAX_VALUE}; + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, false)) { + for (ColumnarBatch batch : vectorizedReader) { + VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); + FieldVector vector = root.getVector("col"); + assertThat(vector) + .as("Vector should be IntVector matching the physical Parquet type") + .isInstanceOf(IntVector.class); + IntVector intVector = (IntVector) vector; + for (int i = 0; i < root.getRowCount(); i++) { + assertThat(intVector.get(i)) + .as("Row %d value should be read correctly", rowIndex) + .isEqualTo(expectedValues[rowIndex]); + rowIndex++; + } + totalRows += root.getRowCount(); + root.close(); + } + } + + assertThat(totalRows).isEqualTo(records.size()); + } + private static Stream rejectedUnsignedIntegerCases() { return Stream.of( Arguments.of( From 12d99ed1903e7e99d1bfd73e92f566307c343971 Mon Sep 17 00:00:00 2001 From: Xiening Dai Date: Mon, 18 May 2026 23:15:12 +0000 Subject: [PATCH 2/2] Add new test case testIntToLongPromotionWithLargeValuesAndReuseContainers and address some minor comments --- .../arrow/vectorized/TestArrowReader.java | 151 ++++++++++++++---- 1 file changed, 117 insertions(+), 34 deletions(-) diff --git a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java index 9a003e52dcff..e23006c79d18 100644 --- a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java +++ b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/TestArrowReader.java @@ -488,29 +488,23 @@ public void testIntToLongPromotionWithLogicalType() throws Exception { // Read with the vectorized reader — the underlying vector is IntVector (physical type), // but the accessor correctly widens to long when getLong() is called. - int totalRows = 0; - int rowIndex = 0; try (VectorizedTableScanIterable vectorizedReader = new VectorizedTableScanIterable(table.newScan(), 1024, false)) { for (ColumnarBatch batch : vectorizedReader) { - VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); - FieldVector vector = root.getVector("col"); + assertThat(batch.numRows()).isEqualTo(values.size()); + + FieldVector vector = batch.column(0).getArrowVector(); assertThat(vector) .as("Vector should be IntVector matching the physical Parquet type") .isInstanceOf(IntVector.class); - IntVector intVector = (IntVector) vector; - for (int i = 0; i < root.getRowCount(); i++) { - assertThat(intVector.get(i)) - .as("Row %d value should be read correctly", rowIndex) - .isEqualTo(values.get(rowIndex)); - rowIndex++; + + for (int i = 0; i < batch.numRows(); i++) { + assertThat(batch.column(0).getLong(i)) + .as("Accessor should widen int to long for row %d", i) + .isEqualTo((long) values.get(i)); } - totalRows += root.getRowCount(); - root.close(); } } - - assertThat(totalRows).isEqualTo(values.size()); } /** @@ -528,7 +522,8 @@ public void testIntToLongPromotionWithoutLogicalType() throws Exception { // Write via Iceberg's writer which produces bare INT32 (no logical type annotation) List records = Lists.newArrayList(); - for (int val : new int[] {1, 2, 3, Integer.MAX_VALUE}) { + int[] values = new int[] {1, 2, 3, Integer.MAX_VALUE}; + for (int val : values) { GenericRecord rec = GenericRecord.create(schema); rec.setField("col", val); records.add(rec); @@ -561,30 +556,123 @@ public void testIntToLongPromotionWithoutLogicalType() throws Exception { // Read with the vectorized reader — the underlying vector is IntVector (physical type), // but the accessor correctly widens to long when getLong() is called. - int totalRows = 0; - int rowIndex = 0; - int[] expectedValues = new int[] {1, 2, 3, Integer.MAX_VALUE}; try (VectorizedTableScanIterable vectorizedReader = new VectorizedTableScanIterable(table.newScan(), 1024, false)) { for (ColumnarBatch batch : vectorizedReader) { - VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); - FieldVector vector = root.getVector("col"); + assertThat(batch.numRows()).isEqualTo(values.length); + + FieldVector vector = batch.column(0).getArrowVector(); assertThat(vector) .as("Vector should be IntVector matching the physical Parquet type") .isInstanceOf(IntVector.class); - IntVector intVector = (IntVector) vector; - for (int i = 0; i < root.getRowCount(); i++) { - assertThat(intVector.get(i)) - .as("Row %d value should be read correctly", rowIndex) - .isEqualTo(expectedValues[rowIndex]); - rowIndex++; + + for (int i = 0; i < batch.numRows(); i++) { + assertThat(batch.column(0).getLong(i)) + .as("Accessor should widen int to long for row %d", i) + .isEqualTo((long) values[i]); } - totalRows += root.getRowCount(); - root.close(); } } + } + + /** + * Tests that int-to-long promotion works correctly when values larger than Integer.MAX_VALUE are + * written after the promotion and reuseContainers is true. This verifies that reading a mix of + * pre-promotion (int-range) and post-promotion (long-range) files works correctly. + */ + @Test + public void testIntToLongPromotionWithLargeValuesAndReuseContainers() throws Exception { + tables = new HadoopTables(); + Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); + Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-large-values"); - assertThat(totalRows).isEqualTo(records.size()); + // Write a Parquet file with INT(32, signed) logical type (pre-promotion data) + MessageType parquetSchema = + new MessageType( + "test", + primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) + .as(LogicalTypeAnnotation.intType(32, true)) + .id(1) + .named("col")); + + File prePromotionFile = new File(tempDir, "pre-promotion.parquet"); + List intValues = ImmutableList.of(1, 2, Integer.MAX_VALUE); + try (ParquetWriter writer = + ExampleParquetWriter.builder(new Path(prePromotionFile.toURI())) + .withType(parquetSchema) + .build()) { + SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); + for (int val : intValues) { + Group group = factory.newGroup(); + group.add("col", val); + writer.write(group); + } + } + + DataFile prePromotionDataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(prePromotionFile.getAbsolutePath()) + .withFileSizeInBytes(prePromotionFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(intValues.size()) + .build(); + table.newAppend().appendFile(prePromotionDataFile).commit(); + + // Promote the column type from int to long + table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-large-values"); + + // Write a second file with long values > Integer.MAX_VALUE (post-promotion data) + List longRecords = Lists.newArrayList(); + long[] longValues = new long[] {(long) Integer.MAX_VALUE + 1L, Long.MAX_VALUE}; + for (long val : longValues) { + GenericRecord rec = GenericRecord.create(table.schema()); + rec.setField("col", val); + longRecords.add(rec); + } + + File postPromotionFile = new File(tempDir, "post-promotion.parquet"); + FileAppender appender = + Parquet.write(Files.localOutput(postPromotionFile)) + .schema(table.schema()) + .createWriterFunc(GenericParquetWriter::create) + .build(); + try { + appender.addAll(longRecords); + } finally { + appender.close(); + } + + DataFile postPromotionDataFile = + DataFiles.builder(PartitionSpec.unpartitioned()) + .withPath(postPromotionFile.getAbsolutePath()) + .withFileSizeInBytes(postPromotionFile.length()) + .withFormat(FileFormat.PARQUET) + .withRecordCount(longRecords.size()) + .build(); + table.newAppend().appendFile(postPromotionDataFile).commit(); + table = tables.load(tempDir.toURI() + "/int-promotion-large-values"); + + // Read with reuseContainers=true and validate all values + List allExpectedValues = Lists.newArrayList(); + for (int v : intValues) { + allExpectedValues.add((long) v); + } + for (long v : longValues) { + allExpectedValues.add(v); + } + + List actualValues = Lists.newArrayList(); + try (VectorizedTableScanIterable vectorizedReader = + new VectorizedTableScanIterable(table.newScan(), 1024, true)) { + for (ColumnarBatch batch : vectorizedReader) { + for (int i = 0; i < batch.numRows(); i++) { + actualValues.add(batch.column(0).getLong(i)); + } + } + } + + assertThat(actualValues).containsExactlyInAnyOrderElementsOf(allExpectedValues); } private static Stream rejectedUnsignedIntegerCases() { @@ -1149,11 +1237,6 @@ private void writeTable(boolean constantRecords) throws Exception { overwrite.addFile(writeParquetFile(table, records)); } overwrite.commit(); - - // Perform a type promotion - // TODO: The read Arrow vector should of type BigInt (promoted type) but it is Int (old type). - Table tableLatest = tables.load(tableLocation); - tableLatest.updateSchema().updateColumn("int_promotion", Types.LongType.get()).commit(); } private static org.apache.arrow.vector.types.pojo.Schema createExpectedArrowSchema(