-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Arrow: Fix ClassCastException in vectorized reader on int-to-long pro… #16343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I miss a test case where the table is written with values larger than Integer.MAX_VALUE after the type promotion and reuseContainers is true for the table scan . |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -436,6 +436,157 @@ public void testUnsignedSmallIntegerColumnRoundtrips(int unsignedBitWidth, int v | |||||||||||||||||||||||||||
| assertThat(totalRows).isEqualTo(1); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet | ||||||||||||||||||||||||||||
| * file has an INT(32, true) logical type annotation. This reproduces a bug where reading a file | ||||||||||||||||||||||||||||
| * written with INT(32) logical type after an ALTER TABLE promoting the column from int to long | ||||||||||||||||||||||||||||
| * causes a ClassCastException (BigIntVector cannot be cast to IntVector). | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * <p>The vector remains an IntVector (matching the physical storage), and the accessor handles | ||||||||||||||||||||||||||||
| * widening to long on read. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||
| public void testIntToLongPromotionWithLogicalType() throws Exception { | ||||||||||||||||||||||||||||
| tables = new HadoopTables(); | ||||||||||||||||||||||||||||
| Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); | ||||||||||||||||||||||||||||
| Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-logical"); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Write a Parquet file with INT(32, signed) logical type annotation. | ||||||||||||||||||||||||||||
| // This is what non-Iceberg writers (PyArrow, Spark native, etc.) typically produce. | ||||||||||||||||||||||||||||
| MessageType parquetSchema = | ||||||||||||||||||||||||||||
| new MessageType( | ||||||||||||||||||||||||||||
| "test", | ||||||||||||||||||||||||||||
| primitive(PrimitiveType.PrimitiveTypeName.INT32, Type.Repetition.REQUIRED) | ||||||||||||||||||||||||||||
| .as(LogicalTypeAnnotation.intType(32, true)) | ||||||||||||||||||||||||||||
| .id(1) | ||||||||||||||||||||||||||||
| .named("col")); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| File testFile = new File(tempDir, "int-logical-type-promotion.parquet"); | ||||||||||||||||||||||||||||
| List<Integer> values = ImmutableList.of(1, 2, 3, Integer.MAX_VALUE); | ||||||||||||||||||||||||||||
| try (ParquetWriter<Group> writer = | ||||||||||||||||||||||||||||
| ExampleParquetWriter.builder(new Path(testFile.toURI())).withType(parquetSchema).build()) { | ||||||||||||||||||||||||||||
| SimpleGroupFactory factory = new SimpleGroupFactory(parquetSchema); | ||||||||||||||||||||||||||||
| for (int val : values) { | ||||||||||||||||||||||||||||
| Group group = factory.newGroup(); | ||||||||||||||||||||||||||||
| group.add("col", val); | ||||||||||||||||||||||||||||
| writer.write(group); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| DataFile dataFile = | ||||||||||||||||||||||||||||
| DataFiles.builder(PartitionSpec.unpartitioned()) | ||||||||||||||||||||||||||||
| .withPath(testFile.getAbsolutePath()) | ||||||||||||||||||||||||||||
| .withFileSizeInBytes(testFile.length()) | ||||||||||||||||||||||||||||
| .withFormat(FileFormat.PARQUET) | ||||||||||||||||||||||||||||
| .withRecordCount(values.size()) | ||||||||||||||||||||||||||||
| .build(); | ||||||||||||||||||||||||||||
| table.newAppend().appendFile(dataFile).commit(); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Promote the column type from int to long (simulates ALTER TABLE) | ||||||||||||||||||||||||||||
| table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); | ||||||||||||||||||||||||||||
| table = tables.load(tempDir.toURI() + "/int-promotion-logical"); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Read with the vectorized reader — the underlying vector is IntVector (physical type), | ||||||||||||||||||||||||||||
| // but the accessor correctly widens to long when getLong() is called. | ||||||||||||||||||||||||||||
| int totalRows = 0; | ||||||||||||||||||||||||||||
| int rowIndex = 0; | ||||||||||||||||||||||||||||
|
Comment on lines
+491
to
+492
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the test data is so small and batch size so large no need for tracking the rows |
||||||||||||||||||||||||||||
| try (VectorizedTableScanIterable vectorizedReader = | ||||||||||||||||||||||||||||
| new VectorizedTableScanIterable(table.newScan(), 1024, false)) { | ||||||||||||||||||||||||||||
| for (ColumnarBatch batch : vectorizedReader) { | ||||||||||||||||||||||||||||
| VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); | ||||||||||||||||||||||||||||
| FieldVector vector = root.getVector("col"); | ||||||||||||||||||||||||||||
| assertThat(vector) | ||||||||||||||||||||||||||||
| .as("Vector should be IntVector matching the physical Parquet type") | ||||||||||||||||||||||||||||
| .isInstanceOf(IntVector.class); | ||||||||||||||||||||||||||||
| IntVector intVector = (IntVector) vector; | ||||||||||||||||||||||||||||
| for (int i = 0; i < root.getRowCount(); i++) { | ||||||||||||||||||||||||||||
| assertThat(intVector.get(i)) | ||||||||||||||||||||||||||||
| .as("Row %d value should be read correctly", rowIndex) | ||||||||||||||||||||||||||||
| .isEqualTo(values.get(rowIndex)); | ||||||||||||||||||||||||||||
| rowIndex++; | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
+501
to
+507
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Testing if the accessor gives back the values correctly is missing
Suggested change
|
||||||||||||||||||||||||||||
| totalRows += root.getRowCount(); | ||||||||||||||||||||||||||||
| root.close(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| assertThat(totalRows).isEqualTo(values.size()); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||
| * Tests that the vectorized reader correctly handles int-to-long type promotion when the Parquet | ||||||||||||||||||||||||||||
| * file has a bare INT32 without any logical type annotation. | ||||||||||||||||||||||||||||
| * | ||||||||||||||||||||||||||||
| * <p>The vector remains an IntVector (matching the physical storage), and the accessor handles | ||||||||||||||||||||||||||||
| * widening to long on read. | ||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||
| @Test | ||||||||||||||||||||||||||||
| public void testIntToLongPromotionWithoutLogicalType() throws Exception { | ||||||||||||||||||||||||||||
| tables = new HadoopTables(); | ||||||||||||||||||||||||||||
| Schema schema = new Schema(Types.NestedField.required(1, "col", Types.IntegerType.get())); | ||||||||||||||||||||||||||||
| Table table = tables.create(schema, tempDir.toURI() + "/int-promotion-no-logical"); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Write via Iceberg's writer which produces bare INT32 (no logical type annotation) | ||||||||||||||||||||||||||||
| List<GenericRecord> records = Lists.newArrayList(); | ||||||||||||||||||||||||||||
| for (int val : new int[] {1, 2, 3, Integer.MAX_VALUE}) { | ||||||||||||||||||||||||||||
| GenericRecord rec = GenericRecord.create(schema); | ||||||||||||||||||||||||||||
| rec.setField("col", val); | ||||||||||||||||||||||||||||
| records.add(rec); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| File testFile = new File(tempDir, "int-no-logical-type-promotion.parquet"); | ||||||||||||||||||||||||||||
| FileAppender<GenericRecord> appender = | ||||||||||||||||||||||||||||
| Parquet.write(Files.localOutput(testFile)) | ||||||||||||||||||||||||||||
| .schema(schema) | ||||||||||||||||||||||||||||
| .createWriterFunc(GenericParquetWriter::create) | ||||||||||||||||||||||||||||
| .build(); | ||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||
| appender.addAll(records); | ||||||||||||||||||||||||||||
| } finally { | ||||||||||||||||||||||||||||
| appender.close(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| DataFile dataFile = | ||||||||||||||||||||||||||||
| DataFiles.builder(PartitionSpec.unpartitioned()) | ||||||||||||||||||||||||||||
| .withPath(testFile.getAbsolutePath()) | ||||||||||||||||||||||||||||
| .withFileSizeInBytes(testFile.length()) | ||||||||||||||||||||||||||||
| .withFormat(FileFormat.PARQUET) | ||||||||||||||||||||||||||||
| .withRecordCount(records.size()) | ||||||||||||||||||||||||||||
| .build(); | ||||||||||||||||||||||||||||
| table.newAppend().appendFile(dataFile).commit(); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Promote the column type from int to long | ||||||||||||||||||||||||||||
| table.updateSchema().updateColumn("col", Types.LongType.get()).commit(); | ||||||||||||||||||||||||||||
| table = tables.load(tempDir.toURI() + "/int-promotion-no-logical"); | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Read with the vectorized reader — the underlying vector is IntVector (physical type), | ||||||||||||||||||||||||||||
| // but the accessor correctly widens to long when getLong() is called. | ||||||||||||||||||||||||||||
| int totalRows = 0; | ||||||||||||||||||||||||||||
| int rowIndex = 0; | ||||||||||||||||||||||||||||
| int[] expectedValues = new int[] {1, 2, 3, Integer.MAX_VALUE}; | ||||||||||||||||||||||||||||
| try (VectorizedTableScanIterable vectorizedReader = | ||||||||||||||||||||||||||||
| new VectorizedTableScanIterable(table.newScan(), 1024, false)) { | ||||||||||||||||||||||||||||
| for (ColumnarBatch batch : vectorizedReader) { | ||||||||||||||||||||||||||||
| VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors(); | ||||||||||||||||||||||||||||
| FieldVector vector = root.getVector("col"); | ||||||||||||||||||||||||||||
| assertThat(vector) | ||||||||||||||||||||||||||||
| .as("Vector should be IntVector matching the physical Parquet type") | ||||||||||||||||||||||||||||
| .isInstanceOf(IntVector.class); | ||||||||||||||||||||||||||||
| IntVector intVector = (IntVector) vector; | ||||||||||||||||||||||||||||
| for (int i = 0; i < root.getRowCount(); i++) { | ||||||||||||||||||||||||||||
| assertThat(intVector.get(i)) | ||||||||||||||||||||||||||||
| .as("Row %d value should be read correctly", rowIndex) | ||||||||||||||||||||||||||||
| .isEqualTo(expectedValues[rowIndex]); | ||||||||||||||||||||||||||||
| rowIndex++; | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| totalRows += root.getRowCount(); | ||||||||||||||||||||||||||||
| root.close(); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
Comment on lines
+564
to
+585
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My comments for the other test apply here too. |
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| assertThat(totalRows).isEqualTo(records.size()); | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| private static Stream<Arguments> rejectedUnsignedIntegerCases() { | ||||||||||||||||||||||||||||
| return Stream.of( | ||||||||||||||||||||||||||||
| Arguments.of( | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we remove this comment? this still looks relevant
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the check below was self explanatory. I add them back.