diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java
index c4cd3c06adfd..f7bbf48a35aa 100644
--- a/api/src/main/java/org/apache/iceberg/ContentFile.java
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -167,6 +167,18 @@ default Long fileSequenceNumber() {
return null;
}
+ /**
+ * Returns the commit timestamp in milliseconds for this file.
+ *
+ *
The commit timestamp represents the snapshot timestamp of the commit that added the file. It
+ * is used for inheriting {@code _last_updated_timestamp_ms} in V4 tables.
+ *
+ *
This method can return null if the commit timestamp is unknown.
+ */
+ default Long commitTimestampMs() {
+ return null;
+ }
+
/**
* Returns the starting row ID to assign to new rows in the data file (with _row_id set to null).
*/
diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 2f732aef427f..b6d6f740679d 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -95,7 +95,13 @@ public interface ManifestFile {
"first_row_id",
Types.LongType.get(),
"Starting row ID to assign to new rows in ADDED data files");
- // next ID to assign: 521
+ Types.NestedField COMMIT_TIMESTAMP_MS =
+ optional(
+ 521,
+ "commit_timestamp_ms",
+ Types.LongType.get(),
+ "Snapshot timestamp when the manifest was committed");
+ // next ID to assign: 522
Schema SCHEMA =
new Schema(
@@ -114,7 +120,8 @@ public interface ManifestFile {
DELETED_ROWS_COUNT,
PARTITION_SUMMARIES,
KEY_METADATA,
- FIRST_ROW_ID);
+ FIRST_ROW_ID,
+ COMMIT_TIMESTAMP_MS);
static Schema schema() {
return SCHEMA;
@@ -210,6 +217,11 @@ default Long firstRowId() {
return null;
}
+ /** Returns the snapshot timestamp in milliseconds when the manifest was committed. */
+ default Long commitTimestampMs() {
+ return null;
+ }
+
/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index 58d992d8df3e..7ccc1951b1c6 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -80,6 +80,8 @@ private TestHelpers() {}
IntStream.rangeClosed(2, MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
public static final List V3_AND_ABOVE =
IntStream.rangeClosed(3, MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
+ public static final List V3_AND_BELOW =
+ IntStream.rangeClosed(1, 3).boxed().collect(Collectors.toUnmodifiableList());
/** Wait in a tight check loop until system clock is past {@code timestampMillis} */
public static long waitUntilAfter(long timestampMillis) {
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index 2cc7cde4541a..0b75417da9bc 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -420,6 +420,15 @@ public static VectorizedArrowReader lastUpdated(
}
}
+ public static VectorizedArrowReader lastUpdatedTimestamp(
+ Long baseRowId, Long commitTimestampMs, VectorizedArrowReader tsReader) {
+ if (commitTimestampMs != null && baseRowId != null) {
+ return new LastUpdatedTimestampVectorReader(commitTimestampMs, tsReader);
+ } else {
+ return nulls();
+ }
+ }
+
public static VectorizedReader> replaceWithMetadataReader(
Types.NestedField icebergField,
VectorizedReader> reader,
@@ -434,6 +443,11 @@ public static VectorizedReader> replaceWithMetadataReader(
Long fileSeqNumber = (Long) idToConstant.get(id);
return VectorizedArrowReader.lastUpdated(
baseRowId, fileSeqNumber, (VectorizedArrowReader) reader);
+ } else if (id == MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId()) {
+ Long baseRowId = (Long) idToConstant.get(MetadataColumns.ROW_ID.fieldId());
+ Long commitTimestampMs = (Long) idToConstant.get(id);
+ return VectorizedArrowReader.lastUpdatedTimestamp(
+ baseRowId, commitTimestampMs, (VectorizedArrowReader) reader);
} else if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
return new ConstantVectorReader<>(icebergField, idToConstant.get(id));
@@ -863,6 +877,72 @@ public void close() {
}
}
+ private static final class LastUpdatedTimestampVectorReader extends VectorizedArrowReader {
+ private static final Field LAST_UPDATED_TS =
+ ArrowSchemaUtil.convert(MetadataColumns.LAST_UPDATED_TIMESTAMP_MS);
+
+ private final long commitTimestampMs;
+ private final VectorizedReader tsReader;
+ private NullabilityHolder nulls;
+
+ private LastUpdatedTimestampVectorReader(
+ long commitTimestampMs, VectorizedReader tsReader) {
+ this.commitTimestampMs = commitTimestampMs;
+ this.tsReader = tsReader == null ? nulls() : tsReader;
+ }
+
+ @Override
+ public VectorHolder read(VectorHolder reuse, int numValsToRead) {
+ FieldVector timestamps = null;
+ try {
+ VectorHolder timestampsHolder = tsReader.read(null, numValsToRead);
+ timestamps = timestampsHolder.vector();
+ ArrowVectorAccessor, String, ?, ?> tsAccessor =
+ timestamps == null ? null : ArrowVectorAccessors.getVectorAccessor(timestampsHolder);
+
+ BigIntVector lastUpdatedTimestamps = allocateBigIntVector(LAST_UPDATED_TS, numValsToRead);
+ ArrowBuf dataBuffer = lastUpdatedTimestamps.getDataBuffer();
+ for (int i = 0; i < numValsToRead; i += 1) {
+ long bufferOffset = (long) i * Long.BYTES;
+ if (tsAccessor == null || isNull(timestampsHolder, i)) {
+ dataBuffer.setLong(bufferOffset, commitTimestampMs);
+ } else {
+ long materializedTimestamp = tsAccessor.getLong(i);
+ dataBuffer.setLong(bufferOffset, materializedTimestamp);
+ }
+ }
+
+ lastUpdatedTimestamps.setValueCount(numValsToRead);
+ return VectorHolder.vectorHolder(
+ lastUpdatedTimestamps, MetadataColumns.LAST_UPDATED_TIMESTAMP_MS, nulls);
+ } finally {
+ if (timestamps != null) {
+ timestamps.close();
+ }
+ }
+ }
+
+ @Override
+ public void setRowGroupInfo(
+ PageReadStore source, Map metadata) {
+ tsReader.setRowGroupInfo(source, metadata);
+ }
+
+ @Override
+ public void setBatchSize(int batchSize) {
+ if (nulls == null || nulls.size() < batchSize) {
+ this.nulls = newNullabilityHolder(batchSize);
+ }
+
+ tsReader.setBatchSize(batchSize);
+ }
+
+ @Override
+ public void close() {
+ // don't close result vectors as they are not owned by readers
+ }
+ }
+
private static boolean isNull(VectorHolder holder, int index) {
return holder.nullabilityHolder().isNullAt(index) == 1;
}
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
index 588b5df1ba97..0958603852f7 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestReadBenchmark.java
@@ -79,6 +79,7 @@ public void before() {
0,
1L,
0,
+ 0L,
0L)) {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
diff --git a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
index b0dab63dea06..8a940be81e55 100644
--- a/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
+++ b/core/src/jmh/java/org/apache/iceberg/ManifestWriteBenchmark.java
@@ -108,6 +108,7 @@ public void writeManifestFile(BenchmarkState state) throws IOException {
0,
1L,
0,
+ 0L,
0L)) {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
index a02e0eff55a2..9e76c6981bee 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -68,6 +68,7 @@ public PartitionData copy() {
private long fileSizeInBytes = -1L;
private Long dataSequenceNumber = null;
private Long fileSequenceNumber = null;
+ private Long commitTimestampMs = null;
// optional fields
private Map columnSizes = null;
@@ -246,6 +247,7 @@ public PartitionData copy() {
this.sortOrderId = toCopy.sortOrderId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
+ this.commitTimestampMs = toCopy.commitTimestampMs;
this.firstRowId = toCopy.firstRowId;
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
@@ -288,6 +290,15 @@ public void setFileSequenceNumber(Long fileSequenceNumber) {
this.fileSequenceNumber = fileSequenceNumber;
}
+ @Override
+ public Long commitTimestampMs() {
+ return commitTimestampMs;
+ }
+
+ public void setCommitTimestampMs(Long commitTimestampMs) {
+ this.commitTimestampMs = commitTimestampMs;
+ }
+
@Override
public Long firstRowId() {
return firstRowId;
@@ -626,6 +637,7 @@ public String toString() {
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
+ .add("commit_timestamp_ms", commitTimestampMs == null ? "null" : commitTimestampMs)
.add("first_row_id", firstRowId == null ? "null" : firstRowId)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
diff --git a/core/src/main/java/org/apache/iceberg/Delegates.java b/core/src/main/java/org/apache/iceberg/Delegates.java
index 324fc1cccdc9..249903f3c766 100644
--- a/core/src/main/java/org/apache/iceberg/Delegates.java
+++ b/core/src/main/java/org/apache/iceberg/Delegates.java
@@ -272,6 +272,11 @@ public Long fileSequenceNumber() {
return wrapped.fileSequenceNumber();
}
+ @Override
+ public Long commitTimestampMs() {
+ return wrapped.commitTimestampMs();
+ }
+
@Override
public Long firstRowId() {
return wrapped.firstRowId();
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
index b2ce5fa2aa11..9c71aa4a46c5 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
@@ -32,6 +32,7 @@ class GenericManifestEntry>
private Long snapshotId = null;
private Long dataSequenceNumber = null;
private Long fileSequenceNumber = null;
+ private Long commitTimestampMs = null;
private F file = null;
GenericManifestEntry(org.apache.avro.Schema schema) {
@@ -49,48 +50,73 @@ private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) {
this.snapshotId = toCopy.snapshotId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
+ this.commitTimestampMs = toCopy.commitTimestampMs;
this.file = toCopy.file().copy(fullCopy);
}
ManifestEntry wrapExisting(ManifestEntry entry) {
return wrapExisting(
- entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber(), entry.file());
+ entry.snapshotId(),
+ entry.dataSequenceNumber(),
+ entry.fileSequenceNumber(),
+ entry.commitTimestampMs(),
+ entry.file());
}
ManifestEntry wrapExisting(
- Long newSnapshotId, Long newDataSequenceNumber, Long newFileSequenceNumber, F newFile) {
+ Long newSnapshotId,
+ Long newDataSequenceNumber,
+ Long newFileSequenceNumber,
+ Long newCommitTimestampMs,
+ F newFile) {
this.status = Status.EXISTING;
this.snapshotId = newSnapshotId;
this.dataSequenceNumber = newDataSequenceNumber;
this.fileSequenceNumber = newFileSequenceNumber;
+ this.commitTimestampMs = newCommitTimestampMs;
this.file = newFile;
return this;
}
ManifestEntry wrapAppend(Long newSnapshotId, F newFile) {
- return wrapAppend(newSnapshotId, null, newFile);
+ return wrapAppend(newSnapshotId, null, null, newFile);
}
ManifestEntry wrapAppend(Long newSnapshotId, Long newDataSequenceNumber, F newFile) {
+ return wrapAppend(newSnapshotId, newDataSequenceNumber, null, newFile);
+ }
+
+ ManifestEntry wrapAppend(
+ Long newSnapshotId, Long newDataSequenceNumber, Long newCommitTimestampMs, F newFile) {
this.status = Status.ADDED;
this.snapshotId = newSnapshotId;
this.dataSequenceNumber = newDataSequenceNumber;
this.fileSequenceNumber = null;
+ this.commitTimestampMs = newCommitTimestampMs;
this.file = Delegates.suppressFirstRowId(newFile);
return this;
}
ManifestEntry wrapDelete(Long newSnapshotId, ManifestEntry entry) {
return wrapDelete(
- newSnapshotId, entry.dataSequenceNumber(), entry.fileSequenceNumber(), entry.file());
+ newSnapshotId,
+ entry.dataSequenceNumber(),
+ entry.fileSequenceNumber(),
+ entry.commitTimestampMs(),
+ entry.file());
}
ManifestEntry wrapDelete(
- Long newSnapshotId, Long newDataSequenceNumber, Long newFileSequenceNumber, F newFile) {
+ Long newSnapshotId,
+ Long newDataSequenceNumber,
+ Long newFileSequenceNumber,
+ Long newCommitTimestampMs,
+ F newFile) {
this.status = Status.DELETED;
this.snapshotId = newSnapshotId;
this.dataSequenceNumber = newDataSequenceNumber;
this.fileSequenceNumber = newFileSequenceNumber;
+ this.commitTimestampMs = newCommitTimestampMs;
this.file = newFile;
return this;
}
@@ -121,6 +147,11 @@ public Long fileSequenceNumber() {
return fileSequenceNumber;
}
+ @Override
+ public Long commitTimestampMs() {
+ return commitTimestampMs;
+ }
+
/**
* @return a file
*/
@@ -154,6 +185,11 @@ public void setFileSequenceNumber(long newFileSequenceNumber) {
this.fileSequenceNumber = newFileSequenceNumber;
}
+ @Override
+ public void setCommitTimestampMs(long newCommitTimestampMs) {
+ this.commitTimestampMs = newCommitTimestampMs;
+ }
+
@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
@@ -173,6 +209,9 @@ public void put(int i, Object v) {
case 4:
this.file = (F) v;
return;
+ case 5:
+ this.commitTimestampMs = (Long) v;
+ return;
default:
// ignore the object, it must be from a newer version of the format
}
@@ -196,6 +235,8 @@ public Object get(int i) {
return fileSequenceNumber;
case 4:
return file;
+ case 5:
+ return commitTimestampMs;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
@@ -213,7 +254,7 @@ public org.apache.avro.Schema getSchema() {
@Override
public int size() {
- return 5;
+ return 6;
}
@Override
@@ -223,6 +264,7 @@ public String toString() {
.add("snapshot_id", snapshotId)
.add("data_sequence_number", dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber)
+ .add("commit_timestamp_ms", commitTimestampMs)
.add("file", file)
.toString();
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
index ac93222d01b5..349157b386fb 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
@@ -62,6 +62,7 @@ public class GenericManifestFile extends SupportsIndexProjection
private PartitionFieldSummary[] partitions = null;
private byte[] keyMetadata = null;
private Long firstRowId = null;
+ private Long commitTimestampMs = null;
/** Used by Avro reflection to instantiate this class when reading manifest files. */
public GenericManifestFile(Schema avroSchema) {
@@ -94,6 +95,7 @@ public GenericManifestFile(Schema avroSchema) {
this.partitions = null;
this.keyMetadata = null;
this.firstRowId = null;
+ this.commitTimestampMs = null;
}
/** Adjust the arg order to avoid conflict with the public constructor below */
@@ -113,7 +115,8 @@ public GenericManifestFile(Schema avroSchema) {
Long existingRowsCount,
Integer deletedFilesCount,
Long deletedRowsCount,
- Long firstRowId) {
+ Long firstRowId,
+ Long commitTimestampMs) {
super(ManifestFile.schema().columns().size());
this.avroSchema = AVRO_SCHEMA;
this.manifestPath = path;
@@ -132,6 +135,7 @@ public GenericManifestFile(Schema avroSchema) {
this.partitions = partitions == null ? null : partitions.toArray(new PartitionFieldSummary[0]);
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.firstRowId = firstRowId;
+ this.commitTimestampMs = commitTimestampMs;
}
/**
@@ -174,6 +178,7 @@ private GenericManifestFile(GenericManifestFile toCopy) {
? null
: Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length);
this.firstRowId = toCopy.firstRowId;
+ this.commitTimestampMs = toCopy.commitTimestampMs;
}
/** Constructor for Java serialization. */
@@ -274,6 +279,11 @@ public Long firstRowId() {
return firstRowId;
}
+ @Override
+ public Long commitTimestampMs() {
+ return commitTimestampMs;
+ }
+
@Override
public int size() {
return ManifestFile.schema().columns().size();
@@ -323,6 +333,8 @@ private Object getByPos(int basePos) {
return keyMetadata();
case 15:
return firstRowId();
+ case 16:
+ return commitTimestampMs();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
}
@@ -384,6 +396,9 @@ protected void internalSet(int basePos, T value) {
case 15:
this.firstRowId = (Long) value;
return;
+ case 16:
+ this.commitTimestampMs = (Long) value;
+ return;
default:
// ignore the object, it must be from a newer version of the format
}
@@ -439,6 +454,7 @@ public String toString() {
.add("sequence_number", sequenceNumber)
.add("min_sequence_number", minSequenceNumber)
.add("first_row_id", firstRowId)
+ .add("commit_timestamp_ms", commitTimestampMs)
.toString();
}
@@ -470,7 +486,8 @@ private CopyBuilder(ManifestFile toCopy) {
toCopy.existingRowsCount(),
toCopy.deletedFilesCount(),
toCopy.deletedRowsCount(),
- toCopy.firstRowId());
+ toCopy.firstRowId(),
+ toCopy.commitTimestampMs());
}
}
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
index 37e17e94ca1f..239c5055a9b0 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
+++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
@@ -34,10 +34,16 @@ static InheritableMetadata fromManifest(ManifestFile manifest) {
Preconditions.checkArgument(
manifest.snapshotId() != null,
"Cannot read from ManifestFile with null (unassigned) snapshot ID");
+ Long commitTs = manifest.commitTimestampMs();
+ if (commitTs != null && commitTs == ManifestWriter.UNASSIGNED_TS) {
+ commitTs = null;
+ }
+
return new BaseInheritableMetadata(
manifest.partitionSpecId(),
manifest.snapshotId(),
manifest.sequenceNumber(),
+ commitTs,
manifest.path());
}
@@ -50,13 +56,19 @@ static class BaseInheritableMetadata implements InheritableMetadata {
private final int specId;
private final long snapshotId;
private final long sequenceNumber;
+ private final Long commitTimestampMs;
private final String manifestLocation;
private BaseInheritableMetadata(
- int specId, long snapshotId, long sequenceNumber, String manifestLocation) {
+ int specId,
+ long snapshotId,
+ long sequenceNumber,
+ Long commitTimestampMs,
+ String manifestLocation) {
this.specId = specId;
this.snapshotId = snapshotId;
this.sequenceNumber = sequenceNumber;
+ this.commitTimestampMs = commitTimestampMs;
this.manifestLocation = manifestLocation;
}
@@ -80,11 +92,19 @@ public > ManifestEntry apply(ManifestEntry manife
manifestEntry.setFileSequenceNumber(sequenceNumber);
}
+ // in v4 tables, the commit timestamp is inherited for ADDED entries
+ if (manifestEntry.commitTimestampMs() == null
+ && commitTimestampMs != null
+ && manifestEntry.status() == ManifestEntry.Status.ADDED) {
+ manifestEntry.setCommitTimestampMs(commitTimestampMs);
+ }
+
if (manifestEntry.file() instanceof BaseFile) {
BaseFile> file = (BaseFile>) manifestEntry.file();
file.setSpecId(specId);
file.setDataSequenceNumber(manifestEntry.dataSequenceNumber());
file.setFileSequenceNumber(manifestEntry.fileSequenceNumber());
+ file.setCommitTimestampMs(manifestEntry.commitTimestampMs());
file.setManifestLocation(manifestLocation);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index 4dce92cf5c2f..7062c8881884 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -47,9 +47,10 @@ public int id() {
Types.NestedField SEQUENCE_NUMBER = optional(3, "sequence_number", Types.LongType.get());
Types.NestedField FILE_SEQUENCE_NUMBER =
optional(4, "file_sequence_number", Types.LongType.get());
+ Types.NestedField COMMIT_TIMESTAMP_MS = optional(5, "commit_timestamp_ms", Types.LongType.get());
int DATA_FILE_ID = 2;
- // next ID to assign: 5
+ // next ID to assign: 6
static Schema getSchema(StructType partitionType) {
return wrapFileSchema(DataFile.getType(partitionType));
@@ -61,7 +62,8 @@ static Schema wrapFileSchema(StructType fileType) {
SNAPSHOT_ID,
SEQUENCE_NUMBER,
FILE_SEQUENCE_NUMBER,
- required(DATA_FILE_ID, "data_file", fileType));
+ required(DATA_FILE_ID, "data_file", fileType),
+ COMMIT_TIMESTAMP_MS);
}
/** Returns the status of the file, whether EXISTING, ADDED, or DELETED. */
@@ -125,6 +127,25 @@ default boolean isLive() {
*/
void setFileSequenceNumber(long fileSequenceNumber);
+ /**
+ * Returns the commit timestamp in milliseconds.
+ *
+ *
The commit timestamp represents the snapshot timestamp of the commit that added the file. It
+ * is used for inheriting {@code _last_updated_timestamp_ms} in V4 tables.
+ *
+ *
This method can return null if the commit timestamp is unknown.
+ */
+ default Long commitTimestampMs() {
+ return null;
+ }
+
+ /**
+ * Sets the commit timestamp in milliseconds for this manifest entry.
+ *
+ * @param commitTimestampMs a commit timestamp in milliseconds
+ */
+ default void setCommitTimestampMs(long commitTimestampMs) {}
+
/** Returns a file. */
F file();
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFileParser.java b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java
index 7bd265777db8..bb370f10d55a 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFileParser.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFileParser.java
@@ -45,6 +45,7 @@ class ManifestFileParser {
private static final String PARTITION_FIELD_SUMMARY = "partition-field-summary";
private static final String KEY_METADATA = "key-metadata";
private static final String FIRST_ROW_ID = "first-row-id";
+ private static final String COMMIT_TIMESTAMP_MS = "commit-timestamp-ms";
private ManifestFileParser() {}
@@ -93,6 +94,8 @@ static void toJson(ManifestFile manifestFile, JsonGenerator generator) throws IO
}
JsonUtil.writeLongFieldIfPresent(FIRST_ROW_ID, manifestFile.firstRowId(), generator);
+ JsonUtil.writeLongFieldIfPresent(
+ COMMIT_TIMESTAMP_MS, manifestFile.commitTimestampMs(), generator);
generator.writeEndObject();
}
@@ -142,6 +145,7 @@ static ManifestFile fromJson(JsonNode jsonNode) {
ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode);
Long firstRowId = JsonUtil.getLongOrNull(FIRST_ROW_ID, jsonNode);
+ Long commitTimestampMs = JsonUtil.getLongOrNull(COMMIT_TIMESTAMP_MS, jsonNode);
return new GenericManifestFile(
path,
@@ -159,7 +163,8 @@ static ManifestFile fromJson(JsonNode jsonNode) {
existingRowsCount,
deletedFilesCount,
deletedRowsCount,
- firstRowId);
+ firstRowId,
+ commitTimestampMs);
}
private static class PartitionFieldSummaryParser {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 378bb9dffbc2..b1572860aa5e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -114,17 +114,21 @@ static class V4Writer extends ManifestListWriter {
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
- long firstRowId) {
+ long firstRowId,
+ long commitTimestampMs) {
super(
snapshotFile,
encryptionManager,
- ImmutableMap.of(
- "snapshot-id", String.valueOf(snapshotId),
- "parent-snapshot-id", String.valueOf(parentSnapshotId),
- "sequence-number", String.valueOf(sequenceNumber),
- "first-row-id", String.valueOf(firstRowId),
- "format-version", "4"));
- this.wrapper = new V4Metadata.ManifestFileWrapper(snapshotId, sequenceNumber);
+ ImmutableMap.builder()
+ .put("snapshot-id", String.valueOf(snapshotId))
+ .put("parent-snapshot-id", String.valueOf(parentSnapshotId))
+ .put("sequence-number", String.valueOf(sequenceNumber))
+ .put("first-row-id", String.valueOf(firstRowId))
+ .put("commit-timestamp-ms", String.valueOf(commitTimestampMs))
+ .put("format-version", "4")
+ .buildOrThrow());
+ this.wrapper =
+ new V4Metadata.ManifestFileWrapper(snapshotId, sequenceNumber, commitTimestampMs);
this.nextRowId = firstRowId;
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestLists.java b/core/src/main/java/org/apache/iceberg/ManifestLists.java
index 5d7713ad06c6..ec540db41acd 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestLists.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestLists.java
@@ -55,7 +55,8 @@ static ManifestListWriter write(
long snapshotId,
Long parentSnapshotId,
long sequenceNumber,
- Long firstRowId) {
+ Long firstRowId,
+ long commitTimestampMs) {
switch (formatVersion) {
case 1:
Preconditions.checkArgument(
@@ -82,7 +83,8 @@ static ManifestListWriter write(
snapshotId,
parentSnapshotId,
sequenceNumber,
- firstRowId);
+ firstRowId,
+ commitTimestampMs);
}
throw new UnsupportedOperationException(
"Cannot write manifest list for table version: " + formatVersion);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index 43b8e3ed7095..400a587c07d1 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -38,6 +38,7 @@ public abstract class ManifestWriter> implements FileAp
// stand-in for the current sequence number that will be assigned when the commit is successful
// this is replaced when writing a manifest list by the ManifestFile wrapper
static final long UNASSIGNED_SEQ = -1L;
+ static final long UNASSIGNED_TS = -1L;
private final OutputFile file;
private final EncryptionKeyMetadata keyMetadata;
@@ -133,12 +134,32 @@ public void add(F addedFile, long dataSequenceNumber) {
addEntry(reused.wrapAppend(snapshotId, dataSequenceNumber, addedFile));
}
+ /**
+ * Add an added entry for a file with explicit data sequence number and commit timestamp.
+ *
+ *
The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
+ * number and commit timestamp are taken from the arguments; the file sequence number will be
+ * assigned at commit.
+ *
+ *
This overload is intended for callers that copy or rewrite manifest entries (for example
+ * {@code RewriteTablePathUtil}) and need to preserve the original commit timestamp on the entry
+ * directly rather than rely on inheritance from the manifest list at read time.
+ *
+ * @param addedFile a data file
+ * @param dataSequenceNumber a data sequence number for the file
+ * @param commitTimestampMs commit timestamp (in milliseconds) of the snapshot when the file was
+ * added; may be {@code null} for tables that do not track commit timestamps (V3 and earlier)
+ */
+ public void add(F addedFile, long dataSequenceNumber, Long commitTimestampMs) {
+ addEntry(reused.wrapAppend(snapshotId, dataSequenceNumber, commitTimestampMs, addedFile));
+ }
+
void add(ManifestEntry entry) {
- if (entry.dataSequenceNumber() != null && entry.dataSequenceNumber() >= 0) {
- addEntry(reused.wrapAppend(snapshotId, entry.dataSequenceNumber(), entry.file()));
- } else {
- addEntry(reused.wrapAppend(snapshotId, entry.file()));
- }
+ Long dataSeqNumber =
+ (entry.dataSequenceNumber() != null && entry.dataSequenceNumber() >= 0)
+ ? entry.dataSequenceNumber()
+ : null;
+ addEntry(reused.wrapAppend(snapshotId, dataSeqNumber, entry.commitTimestampMs(), entry.file()));
}
/**
@@ -151,10 +172,36 @@ void add(ManifestEntry entry) {
* @param fileSnapshotId snapshot ID when the data file was added to the table
* @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
* @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @deprecated since 1.12.0, will be removed in 1.13.0; use {@link #existing(ContentFile, long,
+ * long, Long, Long)} which preserves the entry's commit timestamp.
*/
+ @Deprecated
public void existing(
F existingFile, long fileSnapshotId, long dataSequenceNumber, Long fileSequenceNumber) {
- reused.wrapExisting(fileSnapshotId, dataSequenceNumber, fileSequenceNumber, existingFile);
+ existing(existingFile, fileSnapshotId, dataSequenceNumber, fileSequenceNumber, null);
+ }
+
+ /**
+ * Add an existing entry for a file.
+ *
+ *
The original data and file sequence numbers, snapshot ID, and commit timestamp, which were
+ * assigned at commit, must be preserved when adding an existing entry.
+ *
+ * @param existingFile a file
+ * @param fileSnapshotId snapshot ID when the data file was added to the table
+ * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
+ * @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @param commitTimestampMs commit timestamp (in milliseconds) of the snapshot when the file was
+ * added; may be {@code null} for tables that do not track commit timestamps (V3 and earlier)
+ */
+ public void existing(
+ F existingFile,
+ long fileSnapshotId,
+ long dataSequenceNumber,
+ Long fileSequenceNumber,
+ Long commitTimestampMs) {
+ reused.wrapExisting(
+ fileSnapshotId, dataSequenceNumber, fileSequenceNumber, commitTimestampMs, existingFile);
addEntry(reused);
}
@@ -171,9 +218,32 @@ void existing(ManifestEntry entry) {
* @param deletedFile a file
* @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
* @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @deprecated since 1.12.0, will be removed in 1.13.0; use {@link #delete(ContentFile, long,
+ * Long, Long)} which preserves the entry's commit timestamp.
*/
+ @Deprecated
public void delete(F deletedFile, long dataSequenceNumber, Long fileSequenceNumber) {
- addEntry(reused.wrapDelete(snapshotId, dataSequenceNumber, fileSequenceNumber, deletedFile));
+ delete(deletedFile, dataSequenceNumber, fileSequenceNumber, null);
+ }
+
+ /**
+ * Add a delete entry for a file.
+ *
+ *
The entry's snapshot ID will be this manifest's snapshot ID. However, the original data and
+ * file sequence numbers and commit timestamp of the file must be preserved when the file is
+ * marked as deleted.
+ *
+ * @param deletedFile a file
+ * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
+ * @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @param commitTimestampMs commit timestamp (in milliseconds) of the snapshot when the file was
+ * added; may be {@code null} for tables that do not track commit timestamps (V3 and earlier)
+ */
+ public void delete(
+ F deletedFile, long dataSequenceNumber, Long fileSequenceNumber, Long commitTimestampMs) {
+ addEntry(
+ reused.wrapDelete(
+ snapshotId, dataSequenceNumber, fileSequenceNumber, commitTimestampMs, deletedFile));
}
void delete(ManifestEntry entry) {
@@ -226,7 +296,8 @@ public ManifestFile toManifestFile() {
existingRows,
deletedFiles,
deletedRows,
- firstRowId);
+ firstRowId,
+ UNASSIGNED_TS);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
index 7b2629007649..5e0c22b35b9d 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
@@ -106,25 +106,35 @@ private MetadataColumns() {}
"_last_updated_sequence_number",
Types.LongType.get(),
"Sequence number when the row was last updated");
+ public static final NestedField LAST_UPDATED_TIMESTAMP_MS =
+ NestedField.optional(
+ Integer.MAX_VALUE - 109,
+ "_last_updated_timestamp_ms",
+ Types.LongType.get(),
+ "Commit timestamp in milliseconds when the row was last updated");
private static final Map META_COLUMNS =
- ImmutableMap.of(
- FILE_PATH.name(), FILE_PATH,
- ROW_POSITION.name(), ROW_POSITION,
- IS_DELETED.name(), IS_DELETED,
- SPEC_ID.name(), SPEC_ID,
- ROW_ID.name(), ROW_ID,
- LAST_UPDATED_SEQUENCE_NUMBER.name(), LAST_UPDATED_SEQUENCE_NUMBER);
+ ImmutableMap.builder()
+ .put(FILE_PATH.name(), FILE_PATH)
+ .put(ROW_POSITION.name(), ROW_POSITION)
+ .put(IS_DELETED.name(), IS_DELETED)
+ .put(SPEC_ID.name(), SPEC_ID)
+ .put(ROW_ID.name(), ROW_ID)
+ .put(LAST_UPDATED_SEQUENCE_NUMBER.name(), LAST_UPDATED_SEQUENCE_NUMBER)
+ .put(LAST_UPDATED_TIMESTAMP_MS.name(), LAST_UPDATED_TIMESTAMP_MS)
+ .buildOrThrow();
private static final Set META_IDS =
- ImmutableSet.of(
- FILE_PATH.fieldId(),
- ROW_POSITION.fieldId(),
- IS_DELETED.fieldId(),
- SPEC_ID.fieldId(),
- PARTITION_COLUMN_ID,
- ROW_ID.fieldId(),
- LAST_UPDATED_SEQUENCE_NUMBER.fieldId());
+ ImmutableSet.builder()
+ .add(FILE_PATH.fieldId())
+ .add(ROW_POSITION.fieldId())
+ .add(IS_DELETED.fieldId())
+ .add(SPEC_ID.fieldId())
+ .add(PARTITION_COLUMN_ID)
+ .add(ROW_ID.fieldId())
+ .add(LAST_UPDATED_SEQUENCE_NUMBER.fieldId())
+ .add(LAST_UPDATED_TIMESTAMP_MS.fieldId())
+ .build();
public static Set metadataFieldIds() {
return META_IDS;
@@ -155,6 +165,7 @@ public static boolean nonMetadataColumn(String name) {
}
public static Schema schemaWithRowLineage(Schema schema) {
- return TypeUtil.join(schema, new Schema(ROW_ID, LAST_UPDATED_SEQUENCE_NUMBER));
+ return TypeUtil.join(
+ schema, new Schema(ROW_ID, LAST_UPDATED_SEQUENCE_NUMBER, LAST_UPDATED_TIMESTAMP_MS));
}
}
diff --git a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
index f42c50ac56da..03dd191c0022 100644
--- a/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
+++ b/core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
@@ -295,7 +295,8 @@ public static RewriteResult rewriteManifestList(
snapshot.snapshotId(),
snapshot.parentId(),
snapshot.sequenceNumber(),
- snapshot.firstRowId())) {
+ snapshot.firstRowId(),
+ snapshot.timestampMillis())) {
for (ManifestFile file : manifestFiles) {
ManifestFile newFile = file.copy();
@@ -489,14 +490,30 @@ private static > void appendEntryWithFile(
switch (entry.status()) {
case ADDED:
- writer.add(file);
+ // Preserve the original data sequence number and commit timestamp on ADDED entries so
+ // they survive a manifest rewrite even when read directly (without going through the
+ // manifest list inheritance path). For V1 manifests dataSequenceNumber may be null; fall
+ // back to the simple add() in that case.
+ if (entry.dataSequenceNumber() != null) {
+ writer.add(file, entry.dataSequenceNumber(), entry.commitTimestampMs());
+ } else {
+ writer.add(file);
+ }
break;
case EXISTING:
writer.existing(
- file, entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber());
+ file,
+ entry.snapshotId(),
+ entry.dataSequenceNumber(),
+ entry.fileSequenceNumber(),
+ entry.commitTimestampMs());
break;
case DELETED:
- writer.delete(file, entry.dataSequenceNumber(), entry.fileSequenceNumber());
+ writer.delete(
+ file,
+ entry.dataSequenceNumber(),
+ entry.fileSequenceNumber(),
+ entry.commitTimestampMs());
break;
}
}
diff --git a/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
index 5480415eeed2..f6592f876278 100644
--- a/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/RollingManifestWriter.java
@@ -74,6 +74,23 @@ public void add(F addedFile, long dataSequenceNumber) {
currentFileRows++;
}
+ /**
+ * Add an added entry for a file with explicit data sequence number and commit timestamp.
+ *
+ *
The entry's snapshot ID will be this manifest's snapshot ID. The entry's data sequence
+ * number and commit timestamp are taken from the arguments; the file sequence number will be
+ * assigned at commit.
+ *
+ * @param addedFile a data file
+ * @param dataSequenceNumber a data sequence number for the file
+ * @param commitTimestampMs commit timestamp (in milliseconds) of the snapshot when the file was
+ * added; may be {@code null} for tables that do not track commit timestamps (V3 and earlier)
+ */
+ public void add(F addedFile, long dataSequenceNumber, Long commitTimestampMs) {
+ currentWriter().add(addedFile, dataSequenceNumber, commitTimestampMs);
+ currentFileRows++;
+ }
+
/**
* Add an existing entry for a file.
*
@@ -84,10 +101,41 @@ public void add(F addedFile, long dataSequenceNumber) {
* @param fileSnapshotId snapshot ID when the data file was added to the table
* @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
* @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @deprecated since 1.12.0, will be removed in 1.13.0; use {@link #existing(ContentFile, long,
+ * long, Long, Long)} which preserves the entry's commit timestamp.
*/
+ @Deprecated
public void existing(
F existingFile, long fileSnapshotId, long dataSequenceNumber, Long fileSequenceNumber) {
- currentWriter().existing(existingFile, fileSnapshotId, dataSequenceNumber, fileSequenceNumber);
+ existing(existingFile, fileSnapshotId, dataSequenceNumber, fileSequenceNumber, null);
+ }
+
+ /**
+ * Add an existing entry for a file.
+ *
+ *
The original data and file sequence numbers, snapshot ID, and commit timestamp, which were
+ * assigned at commit, must be preserved when adding an existing entry.
+ *
+ * @param existingFile a file
+ * @param fileSnapshotId snapshot ID when the data file was added to the table
+ * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
+ * @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @param commitTimestampMs commit timestamp (in milliseconds) of the snapshot when the file was
+ * added; may be {@code null} for tables that do not track commit timestamps (V3 and earlier)
+ */
+ public void existing(
+ F existingFile,
+ long fileSnapshotId,
+ long dataSequenceNumber,
+ Long fileSequenceNumber,
+ Long commitTimestampMs) {
+ currentWriter()
+ .existing(
+ existingFile,
+ fileSnapshotId,
+ dataSequenceNumber,
+ fileSequenceNumber,
+ commitTimestampMs);
currentFileRows++;
}
@@ -100,9 +148,30 @@ public void existing(
* @param deletedFile a file
* @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
* @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @deprecated since 1.12.0, will be removed in 1.13.0; use {@link #delete(ContentFile, long,
+ * Long, Long)} which preserves the entry's commit timestamp.
*/
+ @Deprecated
public void delete(F deletedFile, long dataSequenceNumber, Long fileSequenceNumber) {
- currentWriter().delete(deletedFile, dataSequenceNumber, fileSequenceNumber);
+ delete(deletedFile, dataSequenceNumber, fileSequenceNumber, null);
+ }
+
+ /**
+ * Add a delete entry for a file.
+ *
+ *
The entry's snapshot ID will be this manifest's snapshot ID. However, the original data and
+ * file sequence numbers and commit timestamp of the file must be preserved when the file is
+ * marked as deleted.
+ *
+ * @param deletedFile a file
+ * @param dataSequenceNumber a data sequence number of the file (assigned when the file was added)
+ * @param fileSequenceNumber a file sequence number (assigned when the file was added)
+ * @param commitTimestampMs commit timestamp (in milliseconds) of the snapshot when the file was
+ * added; may be {@code null} for tables that do not track commit timestamps (V3 and earlier)
+ */
+ public void delete(
+ F deletedFile, long dataSequenceNumber, Long fileSequenceNumber, Long commitTimestampMs) {
+ currentWriter().delete(deletedFile, dataSequenceNumber, fileSequenceNumber, commitTimestampMs);
currentFileRows++;
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index ffbebf59989b..b80f1784be8d 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -35,6 +35,7 @@
import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.math.RoundingMode;
+import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
@@ -121,6 +122,7 @@ public void accept(String file) {
private SnapshotAncestryValidator snapshotAncestryValidator =
SnapshotAncestryValidator.NON_VALIDATING;
+ private Clock clock = Clock.systemUTC();
private ExecutorService workerPool;
private String targetBranch = SnapshotRef.MAIN_BRANCH;
private CommitMetrics commitMetrics;
@@ -273,6 +275,7 @@ public Snapshot apply() {
long sequenceNumber = base.nextSequenceNumber();
Long parentSnapshotId = parentSnapshot == null ? null : parentSnapshot.snapshotId();
+ long timestampMillis = snapshotTimestampMillis(parentSnapshot);
runValidations(parentSnapshot);
@@ -288,7 +291,8 @@ public Snapshot apply() {
snapshotId(),
parentSnapshotId,
sequenceNumber,
- base.nextRowId());
+ base.nextRowId(),
+ timestampMillis);
try (writer) {
// keep track of the manifest lists created
@@ -335,7 +339,7 @@ public Snapshot apply() {
sequenceNumber,
snapshotId(),
parentSnapshotId,
- System.currentTimeMillis(),
+ timestampMillis,
operation(),
summary(base),
base.currentSchemaId(),
@@ -626,6 +630,31 @@ protected ManifestReader newDeleteManifestReader(ManifestFile manife
return ManifestFiles.readDeleteManifest(manifest, ops.io(), ops.current().specsById());
}
+ @VisibleForTesting
+ void setClock(Clock newClock) {
+ this.clock = newClock;
+ }
+
+ /**
+ * Generates the snapshot timestamp in milliseconds.
+ *
+ *
For format version 4 and above, this implements the Lamport clock algorithm to guarantee
+ * monotonically increasing snapshot timestamps. For older format versions, this returns the
+ * current wall clock time.
+ *
+ * @param parentSnapshot the parent snapshot on the target branch, or null if there is no parent
+ * @return the snapshot timestamp in milliseconds
+ */
+ private long snapshotTimestampMillis(Snapshot parentSnapshot) {
+ long now = clock.millis();
+ if (base.formatVersion() >= TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS
+ && parentSnapshot != null) {
+ return Math.max(now, parentSnapshot.timestampMillis() + 1);
+ }
+
+ return now;
+ }
+
protected long snapshotId() {
if (snapshotId == null) {
synchronized (this) {
@@ -855,6 +884,7 @@ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manife
existingRows,
deletedFiles,
deletedRows,
+ null,
null);
} catch (IOException e) {
diff --git a/core/src/main/java/org/apache/iceberg/TableMetadata.java b/core/src/main/java/org/apache/iceberg/TableMetadata.java
index 5aa76d5680fa..d2bb4876462f 100644
--- a/core/src/main/java/org/apache/iceberg/TableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/TableMetadata.java
@@ -57,6 +57,7 @@ public class TableMetadata implements Serializable {
static final int DEFAULT_TABLE_FORMAT_VERSION = 2;
static final int SUPPORTED_TABLE_FORMAT_VERSION = 4;
static final int MIN_FORMAT_VERSION_ROW_LINEAGE = 3;
+ static final int MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS = 4;
static final int INITIAL_SPEC_ID = 0;
static final int INITIAL_SORT_ORDER_ID = 1;
static final int INITIAL_SCHEMA_ID = 0;
diff --git a/core/src/main/java/org/apache/iceberg/V4Metadata.java b/core/src/main/java/org/apache/iceberg/V4Metadata.java
index 67478290aa10..e94991a79bfe 100644
--- a/core/src/main/java/org/apache/iceberg/V4Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V4Metadata.java
@@ -45,7 +45,8 @@ private V4Metadata() {}
ManifestFile.DELETED_ROWS_COUNT.asRequired(),
ManifestFile.PARTITION_SUMMARIES,
ManifestFile.KEY_METADATA,
- ManifestFile.FIRST_ROW_ID);
+ ManifestFile.FIRST_ROW_ID,
+ ManifestFile.COMMIT_TIMESTAMP_MS);
/**
* A wrapper class to write any ManifestFile implementation to Avro using the v4 write schema.
@@ -56,12 +57,14 @@ private V4Metadata() {}
static class ManifestFileWrapper implements ManifestFile, StructLike {
private final long commitSnapshotId;
private final long sequenceNumber;
+ private final long commitTimestampMs;
private ManifestFile wrapped = null;
private Long wrappedFirstRowId = null;
- ManifestFileWrapper(long commitSnapshotId, long sequenceNumber) {
+ ManifestFileWrapper(long commitSnapshotId, long sequenceNumber, long commitTimestampMs) {
this.commitSnapshotId = commitSnapshotId;
this.sequenceNumber = sequenceNumber;
+ this.commitTimestampMs = commitTimestampMs;
}
public ManifestFile wrap(ManifestFile file, Long firstRowId) {
@@ -157,6 +160,17 @@ private Object get(int pos) {
"Found unassigned first-row-id for file: " + wrapped.path());
return wrapped.firstRowId();
}
+ case 16:
+ Long wrappedCommitTs = wrapped.commitTimestampMs();
+ if (wrappedCommitTs != null && wrappedCommitTs == ManifestWriter.UNASSIGNED_TS) {
+ Preconditions.checkState(
+ commitSnapshotId == wrapped.snapshotId(),
+ "Found unassigned commit timestamp for a manifest from snapshot: %s",
+ wrapped.snapshotId());
+ return commitTimestampMs;
+ } else {
+ return wrappedCommitTs;
+ }
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
@@ -257,6 +271,11 @@ public Long firstRowId() {
return wrapped.firstRowId();
}
+ @Override
+ public Long commitTimestampMs() {
+ return wrapped.commitTimestampMs();
+ }
+
@Override
public ManifestFile copy() {
return wrapped.copy();
@@ -274,7 +293,8 @@ static Schema wrapFileSchema(Types.StructType fileSchema) {
ManifestEntry.SNAPSHOT_ID,
ManifestEntry.SEQUENCE_NUMBER,
ManifestEntry.FILE_SEQUENCE_NUMBER,
- required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
+ required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema),
+ ManifestEntry.COMMIT_TIMESTAMP_MS);
}
static Types.StructType fileType(Types.StructType partitionType) {
@@ -364,6 +384,8 @@ private Object get(int pos) {
return wrapped.fileSequenceNumber();
case 4:
return fileWrapper.wrap(wrapped.file());
+ case 5:
+ return wrapped.commitTimestampMs();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
}
@@ -404,6 +426,16 @@ public void setFileSequenceNumber(long fileSequenceNumber) {
wrapped.setFileSequenceNumber(fileSequenceNumber);
}
+ @Override
+ public Long commitTimestampMs() {
+ return wrapped.commitTimestampMs();
+ }
+
+ @Override
+ public void setCommitTimestampMs(long commitTimestampMs) {
+ wrapped.setCommitTimestampMs(commitTimestampMs);
+ }
+
@Override
public F file() {
return wrapped.file();
diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
index 0ac98dabd4b0..d931b3d1c5be 100644
--- a/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
+++ b/core/src/main/java/org/apache/iceberg/avro/ValueReaders.java
@@ -43,6 +43,7 @@
import org.apache.avro.util.Utf8;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.common.DynConstructors;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
@@ -262,6 +263,12 @@ private static Pair> fileFieldReader(
Long fileSeqNumber = (Long) idToConstant.get(fieldId);
return Pair.of(
projectionPos, ValueReaders.lastUpdated(firstRowId, fileSeqNumber, fieldReader));
+ } else if (Objects.equals(fieldId, MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId())) {
+ Long firstRowId = (Long) idToConstant.get(MetadataColumns.ROW_ID.fieldId());
+ Long commitTimestampMs = (Long) idToConstant.get(fieldId);
+ return Pair.of(
+ projectionPos,
+ ValueReaders.lastUpdatedTimestamp(firstRowId, commitTimestampMs, fieldReader));
} else {
return fieldReader(fieldId, projectionPos, fieldReader, idToConstant);
}
@@ -339,6 +346,15 @@ public static ValueReader lastUpdated(
}
}
+ public static ValueReader lastUpdatedTimestamp(
+ Long baseRowId, Long commitTimestampMs, ValueReader> tsReader) {
+ if (commitTimestampMs != null && baseRowId != null) {
+ return new LastUpdatedTimestampReader(commitTimestampMs, (ValueReader) tsReader);
+ } else {
+ return ValueReaders.constant(null);
+ }
+ }
+
private static Map idToPos(Types.StructType struct) {
Map idToPos = Maps.newHashMap();
@@ -1360,4 +1376,33 @@ public void skip(Decoder decoder) throws IOException {
seqReader.skip(decoder);
}
}
+
+ static class LastUpdatedTimestampReader implements ValueReader {
+ private final Long commitTimestampMs;
+ private final ValueReader tsReader;
+
+ LastUpdatedTimestampReader(Long commitTimestampMs, ValueReader tsReader) {
+ Preconditions.checkArgument(
+ commitTimestampMs != null,
+ "Cannot construct LastUpdatedTimestampReader with null commitTimestampMs; "
+ + "use ValueReaders.lastUpdatedTimestamp which falls back to a constant null reader");
+ this.commitTimestampMs = commitTimestampMs;
+ this.tsReader = tsReader;
+ }
+
+ @Override
+ public Long read(Decoder ignored, Object reuse) throws IOException {
+ Long rowLastUpdatedTimestamp = tsReader.read(ignored, reuse);
+ if (rowLastUpdatedTimestamp != null) {
+ return rowLastUpdatedTimestamp;
+ }
+
+ return commitTimestampMs;
+ }
+
+ @Override
+ public void skip(Decoder decoder) throws IOException {
+ tsReader.skip(decoder);
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index ad6ef605420a..a1abfac58001 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -64,6 +64,12 @@ private PartitionUtil() {}
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(),
convertConstant.apply(Types.LongType.get(), task.file().fileSequenceNumber()));
+ if (task.file().commitTimestampMs() != null) {
+ idToConstant.put(
+ MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId(),
+ convertConstant.apply(Types.LongType.get(), task.file().commitTimestampMs()));
+ }
+
// add _file
idToConstant.put(
MetadataColumns.FILE_PATH.fieldId(),
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java b/core/src/test/java/org/apache/iceberg/TestBase.java
index ddac3709443d..1b9be34cdbf3 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -433,9 +433,9 @@ > ManifestEntry manifestEntry(
return entry.wrapAppend(snapshotId, file);
}
case EXISTING:
- return entry.wrapExisting(snapshotId, dataSequenceNumber, fileSequenceNumber, file);
+ return entry.wrapExisting(snapshotId, dataSequenceNumber, fileSequenceNumber, null, file);
case DELETED:
- return entry.wrapDelete(snapshotId, dataSequenceNumber, fileSequenceNumber, file);
+ return entry.wrapDelete(snapshotId, dataSequenceNumber, fileSequenceNumber, null, file);
default:
throw new IllegalArgumentException("Unexpected entry status: " + status);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java
index cfc42fe96427..c0e5fdbb31c2 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestFileParser.java
@@ -97,7 +97,8 @@ static ManifestFile createManifestFile() {
30L,
0,
0L,
- 10L);
+ 10L,
+ 1234567890L);
}
private String manifestFileJson() {
@@ -107,6 +108,6 @@ private String manifestFileJson() {
+ "\"added-rows-count\":10,\"existing-rows-count\":30,\"deleted-rows-count\":0,"
+ "\"partition-field-summary\":[{\"contains-null\":true,\"contains-nan\":false,"
+ "\"lower-bound\":\"0A000000\",\"upper-bound\":\"64000000\"}],"
- + "\"key-metadata\":\"DB030000\",\"first-row-id\":10}";
+ + "\"key-metadata\":\"DB030000\",\"first-row-id\":10,\"commit-timestamp-ms\":1234567890}";
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java
index 0acda5e21e23..223eebab8035 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestListEncryption.java
@@ -95,7 +95,8 @@ public class TestManifestListEncryption {
EXISTING_ROWS,
DELETED_FILES,
DELETED_ROWS,
- FIRST_ROW_ID);
+ FIRST_ROW_ID,
+ null);
@Test
public void testEncryption() throws IOException {
@@ -217,7 +218,8 @@ private ManifestFile writeAndReadEncryptedManifestList(EncryptionManager em) thr
SNAPSHOT_ID,
SNAPSHOT_ID - 1,
SEQ_NUM,
- SNAPSHOT_FIRST_ROW_ID);
+ SNAPSHOT_FIRST_ROW_ID,
+ 0L);
writer.add(TEST_MANIFEST);
writer.close();
ManifestListFile manifestListFile = writer.toManifestListFile();
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
index 299488857331..1cd06e715447 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
@@ -69,6 +69,7 @@ public class TestManifestListVersions {
private static final ByteBuffer KEY_METADATA = null;
private static final long FIRST_ROW_ID = 100L;
private static final long SNAPSHOT_FIRST_ROW_ID = 130L;
+ private static final long COMMIT_TIMESTAMP_MS = 1710000000000L;
private static final ManifestFile TEST_MANIFEST =
new GenericManifestFile(
@@ -87,7 +88,8 @@ public class TestManifestListVersions {
EXISTING_ROWS,
DELETED_FILES,
DELETED_ROWS,
- FIRST_ROW_ID);
+ FIRST_ROW_ID,
+ COMMIT_TIMESTAMP_MS);
private static final ManifestFile TEST_DELETE_MANIFEST =
new GenericManifestFile(
@@ -106,6 +108,7 @@ public class TestManifestListVersions {
EXISTING_ROWS,
DELETED_FILES,
DELETED_ROWS,
+ null,
null);
@TempDir private Path temp;
@@ -121,6 +124,9 @@ public void testV1WriteDeleteManifest() {
public void testV1Write() throws IOException {
ManifestFile manifest = writeAndReadManifestList(1);
+ // v4 fields are not written and are defaulted
+ assertThat(manifest.commitTimestampMs()).isNull();
+
// v3 fields are not written and are defaulted
assertThat(manifest.firstRowId()).isNull();
@@ -146,6 +152,9 @@ public void testV1Write() throws IOException {
public void testV2Write() throws IOException {
ManifestFile manifest = writeAndReadManifestList(2);
+ // v4 fields are not written and are defaulted
+ assertThat(manifest.commitTimestampMs()).isNull();
+
// v3 fields are not written and are defaulted
assertThat(manifest.firstRowId()).isNull();
@@ -169,6 +178,9 @@ public void testV2Write() throws IOException {
public void testV3Write() throws IOException {
ManifestFile manifest = writeAndReadManifestList(3);
+ // v4 fields are not written and are defaulted
+ assertThat(manifest.commitTimestampMs()).isNull();
+
// all v3 fields should be read correctly
assertThat(manifest.path()).isEqualTo(PATH);
assertThat(manifest.length()).isEqualTo(LENGTH);
@@ -205,6 +217,7 @@ public void testV3WriteFirstRowIdAssignment() throws IOException {
EXISTING_ROWS,
DELETED_FILES,
DELETED_ROWS,
+ null,
null);
// write uses firstRowId=SNAPSHOT_FIRST_ROW_ID and ADDED_ROWS are assigned
@@ -252,6 +265,7 @@ public void testV3WriteMixedRowIdAssignment() throws IOException {
EXISTING_ROWS,
DELETED_FILES,
DELETED_ROWS,
+ null,
null);
// write uses firstRowId=SNAPSHOT_FIRST_ROW_ID and ADDED_ROWS are assigned twice
@@ -288,6 +302,64 @@ public void testV3WriteMixedRowIdAssignment() throws IOException {
+ missingFirstRowId.addedRowsCount());
}
+ @Test
+ public void testV4Write() throws IOException {
+ ManifestFile manifest = writeAndReadManifestList(4);
+
+ assertThat(manifest.path()).isEqualTo(PATH);
+ assertThat(manifest.length()).isEqualTo(LENGTH);
+ assertThat(manifest.partitionSpecId()).isEqualTo(SPEC_ID);
+ assertThat(manifest.content()).isEqualTo(ManifestContent.DATA);
+ assertThat(manifest.sequenceNumber()).isEqualTo(SEQ_NUM);
+ assertThat(manifest.minSequenceNumber()).isEqualTo(MIN_SEQ_NUM);
+ assertThat(manifest.snapshotId()).isEqualTo(SNAPSHOT_ID);
+ assertThat(manifest.addedFilesCount()).isEqualTo(ADDED_FILES);
+ assertThat(manifest.addedRowsCount()).isEqualTo(ADDED_ROWS);
+ assertThat(manifest.existingFilesCount()).isEqualTo(EXISTING_FILES);
+ assertThat(manifest.existingRowsCount()).isEqualTo(EXISTING_ROWS);
+ assertThat(manifest.deletedFilesCount()).isEqualTo(DELETED_FILES);
+ assertThat(manifest.deletedRowsCount()).isEqualTo(DELETED_ROWS);
+ assertThat(manifest.firstRowId()).isEqualTo(FIRST_ROW_ID);
+ assertThat(manifest.commitTimestampMs()).isEqualTo(COMMIT_TIMESTAMP_MS);
+ }
+
+ @Test
+ public void testV4CommitTimestampInheritance() throws IOException {
+ ManifestFile manifestWithUnassignedTs =
+ new GenericManifestFile(
+ PATH,
+ LENGTH,
+ SPEC_ID,
+ ManifestContent.DATA,
+ SEQ_NUM,
+ MIN_SEQ_NUM,
+ SNAPSHOT_ID,
+ PARTITION_SUMMARIES,
+ KEY_METADATA,
+ ADDED_FILES,
+ ADDED_ROWS,
+ EXISTING_FILES,
+ EXISTING_ROWS,
+ DELETED_FILES,
+ DELETED_ROWS,
+ FIRST_ROW_ID,
+ ManifestWriter.UNASSIGNED_TS);
+
+ assertThat(manifestWithUnassignedTs.commitTimestampMs())
+ .as("Manifest should have unassigned commit timestamp before manifest list write")
+ .isEqualTo(ManifestWriter.UNASSIGNED_TS);
+
+ // manifest already has firstRowId set, so nextRowId is not advanced
+ ManifestFile result =
+ Iterables.getOnlyElement(
+ ManifestLists.read(
+ writeManifestList(4, SNAPSHOT_FIRST_ROW_ID, manifestWithUnassignedTs)));
+
+ assertThat(result.commitTimestampMs())
+ .as("V4 manifest with unassigned timestamp should inherit commit timestamp from snapshot")
+ .isEqualTo(COMMIT_TIMESTAMP_MS);
+ }
+
@Test
public void testV1ForwardCompatibility() throws IOException {
InputFile manifestList = writeManifestList(TEST_MANIFEST, 1);
@@ -420,6 +492,7 @@ public void testManifestsPartitionSummary(int formatVersion) throws IOException
EXISTING_ROWS,
DELETED_FILES,
DELETED_ROWS,
+ null,
null);
InputFile manifestList =
@@ -460,7 +533,8 @@ private InputFile writeManifestList(
SNAPSHOT_ID,
SNAPSHOT_ID - 1,
formatVersion > 1 ? SEQ_NUM : 0,
- SNAPSHOT_FIRST_ROW_ID);
+ SNAPSHOT_FIRST_ROW_ID,
+ COMMIT_TIMESTAMP_MS);
try (writer) {
for (ManifestFile manifest : manifests) {
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
index 5d682421d541..6e20c0711cfd 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriter.java
@@ -227,11 +227,11 @@ public void testRollingManifestWriterSplitFiles() throws IOException {
addedFileCounts[fileIndex] += 1;
addedRowCounts[fileIndex] += i;
} else if (type == 1) {
- writer.existing(newFile(i), 1, 1, null);
+ writer.existing(newFile(i), 1, 1, null, null);
existingFileCounts[fileIndex] += 1;
existingRowCounts[fileIndex] += i;
} else {
- writer.delete(newFile(i), 1, null);
+ writer.delete(newFile(i), 1, null, null);
deletedFileCounts[fileIndex] += 1;
deletedRowCounts[fileIndex] += i;
}
@@ -283,11 +283,11 @@ public void testRollingDeleteManifestWriterSplitFiles() throws IOException {
addedFileCounts[fileIndex] += 1;
addedRowCounts[fileIndex] += i;
} else if (type == 1) {
- writer.existing(newPosDeleteFile(i), 1, 1, null);
+ writer.existing(newPosDeleteFile(i), 1, 1, null, null);
existingFileCounts[fileIndex] += 1;
existingRowCounts[fileIndex] += i;
} else {
- writer.delete(newPosDeleteFile(i), 1, null);
+ writer.delete(newPosDeleteFile(i), 1, null, null);
deletedFileCounts[fileIndex] += 1;
deletedRowCounts[fileIndex] += i;
}
@@ -320,6 +320,96 @@ public void testRollingDeleteManifestWriterSplitFiles() throws IOException {
deletedRowCounts);
}
+ @TestTemplate
+ @SuppressWarnings("deprecation")
+ public void testDeprecatedExistingAndDeleteOverloads() throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ DataFile dataFile = newFile(10);
+ DeleteFile deleteFile = newPosDeleteFile(5);
+
+ File dataManifestFile = temp.resolve("data-manifest" + System.nanoTime() + ".avro").toFile();
+ OutputFile dataOutput = table.ops().io().newOutputFile(dataManifestFile.getCanonicalPath());
+ ManifestWriter dataWriter =
+ ManifestFiles.write(formatVersion, table.spec(), dataOutput, 1L);
+ dataWriter.existing(dataFile, 1L, 2L, 3L);
+ dataWriter.delete(dataFile, 2L, 3L);
+ dataWriter.close();
+
+ File deleteManifestFile =
+ temp.resolve("delete-manifest" + System.nanoTime() + ".avro").toFile();
+ OutputFile deleteOutput = table.ops().io().newOutputFile(deleteManifestFile.getCanonicalPath());
+ ManifestWriter deleteWriter =
+ ManifestFiles.writeDeleteManifest(formatVersion, table.spec(), deleteOutput, 1L);
+ deleteWriter.existing(deleteFile, 1L, 2L, 3L);
+ deleteWriter.delete(deleteFile, 2L, 3L);
+ deleteWriter.close();
+
+ try (ManifestReader reader =
+ ManifestFiles.read(dataWriter.toManifestFile(), table.io(), table.specs())) {
+ for (ManifestEntry entry : reader.entries()) {
+ assertThat(entry.commitTimestampMs())
+ .as("Deprecated overloads must default commit_timestamp_ms to null")
+ .isNull();
+ }
+ }
+
+ try (ManifestReader reader =
+ ManifestFiles.readDeleteManifest(
+ deleteWriter.toManifestFile(), table.io(), table.specs())) {
+ for (ManifestEntry entry : reader.entries()) {
+ assertThat(entry.commitTimestampMs())
+ .as("Deprecated overloads must default commit_timestamp_ms to null")
+ .isNull();
+ }
+ }
+ }
+
+ @TestTemplate
+ @SuppressWarnings("deprecation")
+ public void testDeprecatedRollingWriterOverloads() throws IOException {
+ assumeThat(formatVersion).isGreaterThan(1);
+
+ RollingManifestWriter dataWriter =
+ new RollingManifestWriter<>(
+ () -> ManifestFiles.write(formatVersion, SPEC, newManifestFile(), 1L), SMALL_FILE_SIZE);
+ dataWriter.existing(newFile(1), 1L, 2L, 3L);
+ dataWriter.delete(newFile(2), 2L, 3L);
+ dataWriter.close();
+
+ RollingManifestWriter deleteWriter =
+ new RollingManifestWriter<>(
+ () -> ManifestFiles.writeDeleteManifest(formatVersion, SPEC, newManifestFile(), 1L),
+ SMALL_FILE_SIZE);
+ deleteWriter.existing(newPosDeleteFile(1), 1L, 2L, 3L);
+ deleteWriter.delete(newPosDeleteFile(2), 2L, 3L);
+ deleteWriter.close();
+
+ for (ManifestFile manifest : dataWriter.toManifestFiles()) {
+ try (ManifestReader reader =
+ ManifestFiles.read(manifest, table.io(), table.specs())) {
+ for (ManifestEntry entry : reader.entries()) {
+ assertThat(entry.commitTimestampMs())
+ .as(
+ "Deprecated RollingManifestWriter overloads must default commit_timestamp_ms to null")
+ .isNull();
+ }
+ }
+ }
+
+ for (ManifestFile manifest : deleteWriter.toManifestFiles()) {
+ try (ManifestReader reader =
+ ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs())) {
+ for (ManifestEntry entry : reader.entries()) {
+ assertThat(entry.commitTimestampMs())
+ .as(
+ "Deprecated RollingManifestWriter overloads must default commit_timestamp_ms to null")
+ .isNull();
+ }
+ }
+ }
+ }
+
private void checkManifests(
List manifests,
int[] addedFileCounts,
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index b0e1c6939ba0..21b39c4d5072 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -84,6 +84,7 @@ public class TestManifestWriterVersions {
private static final List OFFSETS = ImmutableList.of(4L);
private static final Integer SORT_ORDER_ID = 2;
private static final long FIRST_ROW_ID = 100L;
+ private static final long COMMIT_TIMESTAMP_MS = 1710000000000L;
private static final DataFile DATA_FILE =
new GenericDataFile(
@@ -309,6 +310,117 @@ public void testV3ManifestRewriteWithInheritance() throws IOException {
checkRewrittenEntry(readManifest(manifest3), 0L, FileContent.DATA, FIRST_ROW_ID);
}
+ @Test
+ public void testV4Write() throws IOException {
+ ManifestFile manifest = writeManifest(4);
+ assertThat(manifest.commitTimestampMs())
+ .as("commit_timestamp_ms should be unassigned before manifest list write")
+ .isEqualTo(ManifestWriter.UNASSIGNED_TS);
+
+ ManifestEntry entry = readManifest(manifest);
+ checkEntry(
+ entry,
+ ManifestWriter.UNASSIGNED_SEQ,
+ ManifestWriter.UNASSIGNED_SEQ,
+ FileContent.DATA,
+ FIRST_ROW_ID);
+
+ // before manifest list inheritance, the entry's commit_timestamp_ms is null:
+ // ADDED entries are persisted with a null commit_timestamp_ms by ManifestWriter#add, and
+ // InheritableMetadataFactory normalizes UNASSIGNED_TS on the manifest to null so the sentinel
+ // does not leak into entries that are read directly (without going through a manifest list).
+ assertThat(entry.commitTimestampMs())
+ .as("Entry commit_timestamp_ms should be null before manifest list write")
+ .isNull();
+ }
+
+ @Test
+ public void testV4WriteWithInheritance() throws IOException {
+ DataFile withoutFirstRowId =
+ DataFiles.builder(SPEC).copy(DATA_FILE).withFirstRowId(null).build();
+
+ ManifestFile manifest = writeAndReadManifestList(writeManifest(4, withoutFirstRowId), 4);
+ checkManifest(manifest, SEQUENCE_NUMBER);
+ assertThat(manifest.content()).isEqualTo(ManifestContent.DATA);
+ assertThat(manifest.commitTimestampMs())
+ .as("V4 manifest should have commit_timestamp_ms assigned from manifest list")
+ .isEqualTo(COMMIT_TIMESTAMP_MS);
+
+ ManifestEntry entry = readManifest(manifest);
+ checkEntry(entry, SEQUENCE_NUMBER, SEQUENCE_NUMBER, FileContent.DATA, FIRST_ROW_ID);
+
+ // V4 entries inherit commit_timestamp_ms from the manifest
+ assertThat(entry.commitTimestampMs())
+ .as("V4 entry should inherit commit_timestamp_ms from manifest")
+ .isEqualTo(COMMIT_TIMESTAMP_MS);
+ assertThat(entry.file().commitTimestampMs())
+ .as("V4 data file should inherit commit_timestamp_ms from entry")
+ .isEqualTo(COMMIT_TIMESTAMP_MS);
+ }
+
+ @Test
+ public void testV4ManifestListRewriteWithInheritance() throws IOException {
+ // write with v1
+ ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1);
+ checkManifest(manifest, 0L);
+
+ // rewrite existing metadata with a V4 manifest list
+ ManifestFile manifest4 = writeAndReadManifestList(manifest, 4);
+ checkManifest(manifest4, 0L);
+
+ // pre-V4 manifest should not have commit_timestamp_ms assigned
+ assertThat(manifest4.commitTimestampMs())
+ .as("Pre-V4 manifest carried into V4 should have null commit_timestamp_ms")
+ .isNull();
+
+ ManifestEntry entry = readManifest(manifest4);
+ checkEntry(entry, 0L, 0L, FileContent.DATA, FIRST_ROW_ID);
+
+ // pre-V4 entries should not inherit commit_timestamp_ms
+ assertThat(entry.commitTimestampMs())
+ .as("Pre-V4 entry should have null commit_timestamp_ms after V4 manifest list rewrite")
+ .isNull();
+ assertThat(entry.file().commitTimestampMs())
+ .as("Pre-V4 data file should have null commit_timestamp_ms after V4 manifest list rewrite")
+ .isNull();
+ }
+
+ @Test
+ public void testV4ManifestRewriteWithInheritance() throws IOException {
+ // write with v1
+ ManifestFile manifest = writeAndReadManifestList(writeManifest(1), 1);
+ checkManifest(manifest, 0L);
+
+ // rewrite the manifest file using a v4 manifest
+ ManifestFile rewritten = rewriteManifest(manifest, 4);
+ checkRewrittenManifest(rewritten, ManifestWriter.UNASSIGNED_SEQ, 0L);
+
+ // add the v4 manifest to a v4 manifest list, with a sequence number
+ ManifestFile manifest4 = writeAndReadManifestList(rewritten, 4);
+ checkRewrittenManifest(manifest4, SEQUENCE_NUMBER, 0L);
+
+ // should not inherit the v4 sequence number because it was written into the v4 manifest
+ checkRewrittenEntry(readManifest(manifest4), 0L, FileContent.DATA, FIRST_ROW_ID);
+ }
+
+ @Test
+ public void testPreV4ManifestsHaveNullCommitTimestamp() throws IOException {
+ for (int version : TestHelpers.V3_AND_BELOW) {
+ ManifestFile manifest = writeAndReadManifestList(writeManifest(version), version);
+ assertThat(manifest.commitTimestampMs())
+ .as("V%d manifest should have null commit_timestamp_ms", version)
+ .isNull();
+
+ ManifestEntry entry = readManifest(manifest);
+ assertThat(entry.commitTimestampMs())
+ .as("V%d entry should have null commit_timestamp_ms", version)
+ .isNull();
+ assertThat(entry.file().commitTimestampMs())
+ .as("V%d data file should have null commit_timestamp_ms", version)
+ .isNull();
+ }
+ }
+
void checkEntry(
ManifestEntry> entry,
Long expectedDataSequenceNumber,
@@ -415,7 +527,8 @@ private InputFile writeManifestList(ManifestFile manifest, int formatVersion) th
SNAPSHOT_ID,
SNAPSHOT_ID - 1,
formatVersion > 1 ? SEQUENCE_NUMBER : 0,
- FIRST_ROW_ID)) {
+ FIRST_ROW_ID,
+ COMMIT_TIMESTAMP_MS)) {
writer.add(manifest);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
index c661ac834d45..87d795fb85e9 100644
--- a/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java
@@ -1386,6 +1386,7 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS
snapshotId,
parentSnapshotId,
0,
+ 0L,
0L)) {
writer.addAll(manifests);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
index 870622bc983d..51780f1a35ed 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowLineageAssignment.java
@@ -34,9 +34,11 @@
import org.apache.iceberg.types.Types.NestedField;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
+@ExtendWith(ParameterizedTestExtension.class)
public class TestRowLineageAssignment {
public static final Schema SCHEMA =
new Schema(
@@ -84,6 +86,12 @@ public class TestRowLineageAssignment {
.withRecordCount(90)
.build();
+ @Parameters(name = "formatVersion = {0}")
+ private static List formatVersion() {
+ return TestHelpers.V3_AND_ABOVE;
+ }
+
+ @Parameter private int formatVersion;
@TempDir private File location;
private BaseTable table;
@@ -98,7 +106,7 @@ public void createTable() {
"test",
SCHEMA,
PartitionSpec.unpartitioned(),
- 3,
+ formatVersion,
Map.of("random-snapshot-ids", "true"));
}
@@ -107,7 +115,7 @@ public void cleanup() {
TestTables.clearTables();
}
- @Test
+ @TestTemplate
public void testSingleFileAppend() {
assertThat(table.operations().current().nextRowId()).isEqualTo(0L);
@@ -119,9 +127,24 @@ public void testSingleFileAppend() {
ManifestFile manifest = Iterables.getOnlyElement(current.dataManifests(table.io()));
checkDataFileAssignment(table, manifest, 0L);
+
+ if (formatVersion >= 4) {
+ long snapshotTimestamp = current.timestampMillis();
+ InputFile manifestList = table.io().newInputFile(current.manifestListLocation());
+ checkManifestListCommitTimestamp(manifestList, snapshotTimestamp);
+ assertThat(manifest.commitTimestampMs())
+ .as("V4 manifest should have commit_timestamp_ms matching snapshot timestamp")
+ .isEqualTo(snapshotTimestamp);
+ checkDataFileCommitTimestamp(table, manifest, snapshotTimestamp);
+ } else {
+ assertThat(manifest.commitTimestampMs())
+ .as("Pre-V4 manifest should have null commit_timestamp_ms")
+ .isNull();
+ checkDataFileCommitTimestamp(table, manifest, (Long) null);
+ }
}
- @Test
+ @TestTemplate
public void testOverrideFirstRowId() {
assertThat(table.operations().current().nextRowId()).isEqualTo(0L);
@@ -143,7 +166,7 @@ public void testOverrideFirstRowId() {
checkDataFileAssignment(table, manifest, 0L);
}
- @Test
+ @TestTemplate
public void testBranchAssignment() {
// start with a single file in the table
testSingleFileAppend();
@@ -190,7 +213,7 @@ public void testBranchAssignment() {
checkDataFileAssignment(table, mainManifests.get(1), 0L);
}
- @Test
+ @TestTemplate
public void testCherryPickReassignsRowIds() {
// start with a commit in a branch that diverges from main
testBranchAssignment();
@@ -217,7 +240,7 @@ public void testCherryPickReassignsRowIds() {
checkDataFileAssignment(table, mainManifests.get(2), 0L);
}
- @Test
+ @TestTemplate
public void testFastForwardPreservesRowIds() {
// start with a single file in the table
testSingleFileAppend();
@@ -271,7 +294,7 @@ public void testFastForwardPreservesRowIds() {
.isEqualTo(table.snapshot("branch").dataManifests(table.io()));
}
- @Test
+ @TestTemplate
public void testMultiFileAppend() {
assertThat(table.operations().current().nextRowId()).isEqualTo(0L);
@@ -286,11 +309,14 @@ public void testMultiFileAppend() {
checkDataFileAssignment(table, manifest, 0L, FILE_A.recordCount());
}
- @Test
+ @TestTemplate
public void testMultipleFileAppends() {
// write and validate a multi-file commit
testMultiFileAppend();
+ long firstTimestamp = table.currentSnapshot().timestampMillis();
+ TestHelpers.waitUntilAfter(firstTimestamp);
+
long startingNextRowId = table.operations().current().nextRowId();
// add another append commit
@@ -305,9 +331,22 @@ public void testMultipleFileAppends() {
List manifests = current.dataManifests(table.io());
assertThat(manifests).hasSize(2);
checkDataFileAssignment(table, manifests.get(0), startingNextRowId);
+
+ if (formatVersion >= 4) {
+ long secondTimestamp = current.timestampMillis();
+ InputFile manifestList = table.io().newInputFile(current.manifestListLocation());
+ checkManifestListCommitTimestamp(manifestList, secondTimestamp, firstTimestamp);
+ assertThat(manifests.get(0).commitTimestampMs()).isEqualTo(secondTimestamp);
+ assertThat(manifests.get(1).commitTimestampMs()).isEqualTo(firstTimestamp);
+ checkDataFileCommitTimestamp(table, manifests.get(0), secondTimestamp);
+ checkDataFileCommitTimestamp(table, manifests.get(1), firstTimestamp, firstTimestamp);
+ } else {
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testCommitConflict() {
// start with a non-empty table
testSingleFileAppend();
@@ -376,11 +415,14 @@ public void testCommitConflict() {
assertThat(newManifests.get(2).path()).isEqualTo(startingManifest);
}
- @Test
+ @TestTemplate
public void testOverwrite() {
// start with a non-empty table
testSingleFileAppend();
+ long firstTimestamp = table.currentSnapshot().timestampMillis();
+ TestHelpers.waitUntilAfter(firstTimestamp);
+
long startingNextRowId = table.operations().current().nextRowId();
long nextRowId = startingNextRowId + FILE_B.recordCount();
@@ -398,13 +440,24 @@ public void testOverwrite() {
assertThat(manifests).hasSize(2);
checkDataFileAssignment(table, manifests.get(0), startingNextRowId);
checkDataFileAssignment(table, manifests.get(1), 0L);
+
+ if (formatVersion >= 4) {
+ long overwriteTimestamp = current.timestampMillis();
+ assertThat(manifests.get(0).commitTimestampMs()).isEqualTo(overwriteTimestamp);
+ checkDataFileCommitTimestamp(table, manifests.get(0), overwriteTimestamp);
+ assertThat(manifests.get(1).commitTimestampMs()).isEqualTo(overwriteTimestamp);
+ } else {
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testOverwriteWithFilteredManifest() {
// start with multiple data files
testMultiFileAppend();
+ long appendTimestamp = table.currentSnapshot().timestampMillis();
long startingNextRowId = table.operations().current().nextRowId();
assertThat(table.currentSnapshot().allManifests(table.io())).hasSize(1);
@@ -425,9 +478,21 @@ public void testOverwriteWithFilteredManifest() {
checkDataFileAssignment(table, manifests.get(0), startingNextRowId);
// the starting row ID for FILE_B does not change
checkDataFileAssignment(table, manifests.get(1), FILE_A.recordCount());
+
+ if (formatVersion >= 4) {
+ long overwriteTimestamp = current.timestampMillis();
+ assertThat(manifests.get(0).commitTimestampMs()).isEqualTo(overwriteTimestamp);
+ checkDataFileCommitTimestamp(table, manifests.get(0), overwriteTimestamp);
+ assertThat(manifests.get(1).commitTimestampMs()).isEqualTo(overwriteTimestamp);
+ // FILE_B is EXISTING in the filtered manifest and should preserve the original timestamp
+ checkDataFileCommitTimestamp(table, manifests.get(1), appendTimestamp);
+ } else {
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testRowDelta() {
// start with a non-empty table
testSingleFileAppend();
@@ -449,12 +514,13 @@ public void testRowDelta() {
checkDataFileAssignment(table, manifests.get(1), 0L);
}
- @Test
+ @TestTemplate
public void testAssignmentWithManifestCompaction() {
// start with a non-empty table
// data manifests: [added(FILE_A)]
testSingleFileAppend();
+ long fileATimestamp = table.currentSnapshot().timestampMillis();
long startingFirstRowId = table.operations().current().nextRowId();
// add FILE_B and set the min so metadata is merged on the next commit
@@ -473,6 +539,8 @@ public void testAssignmentWithManifestCompaction() {
checkDataFileAssignment(table, preMergeManifests.get(0), startingFirstRowId);
checkDataFileAssignment(table, preMergeManifests.get(1), 0L);
+ long fileBTimestamp = table.currentSnapshot().timestampMillis();
+
table.newAppend().appendFile(FILE_C).commit();
// data manifests: [add(FILE_C), exist(FILE_B), exist(FILE_A)]
@@ -487,9 +555,21 @@ public void testAssignmentWithManifestCompaction() {
List mergedManifests = table.currentSnapshot().dataManifests(table.io());
checkDataFileAssignment(
table, mergedManifests.get(0), preMergeNextRowId, startingFirstRowId, 0L);
+
+ if (formatVersion >= 4) {
+ long compactedTimestamp = table.currentSnapshot().timestampMillis();
+ assertThat(mergedManifests.get(0).commitTimestampMs()).isEqualTo(compactedTimestamp);
+ // FILE_C is ADDED with the compaction commit timestamp,
+ // FILE_B is EXISTING preserving its original timestamp,
+ // FILE_A is EXISTING preserving its original timestamp
+ checkDataFileCommitTimestamp(
+ table, mergedManifests.get(0), compactedTimestamp, fileBTimestamp, fileATimestamp);
+ } else {
+ assertThat(mergedManifests.get(0).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testTableUpgrade(@TempDir File altLocation) {
BaseTable upgradeTable =
TestTables.create(altLocation, "test_upgrade", SCHEMA, PartitionSpec.unpartitioned(), 2);
@@ -504,7 +584,7 @@ public void testTableUpgrade(@TempDir File altLocation) {
.as("v2 tables should always have next-row-id=0")
.isEqualTo(0L);
- TestTables.upgrade(altLocation, "test_upgrade", 3);
+ TestTables.upgrade(altLocation, "test_upgrade", formatVersion);
upgradeTable.refresh();
assertThat(upgradeTable.operations().current().nextRowId())
@@ -527,9 +607,13 @@ public void testTableUpgrade(@TempDir File altLocation) {
// manifests without first_row_id will not assign first_row_id
checkDataFileAssignment(upgradeTable, manifests.get(0), (Long) null);
checkDataFileAssignment(upgradeTable, manifests.get(1), (Long) null);
+
+ // pre-upgrade manifests should have null commit_timestamp_ms regardless of format version
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
}
- @Test
+ @TestTemplate
public void testAssignmentAfterUpgrade(@TempDir File altLocation) {
// data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)]
testTableUpgrade(altLocation);
@@ -563,9 +647,36 @@ public void testAssignmentAfterUpgrade(@TempDir File altLocation) {
// the existing manifests were reused without modification
assertThat(manifests.get(0).path()).isEqualTo(existingManifests.get(0).path());
assertThat(manifests.get(1).path()).isEqualTo(existingManifests.get(1).path());
+
+ // reused pre-upgrade manifests should have null commit_timestamp_ms even for V4
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+
+ // add a non-empty append to verify new manifests get commit_timestamp_ms
+ long postUpgradeNextRowId = upgradeTable.operations().current().nextRowId();
+ upgradeTable.newAppend().appendFile(FILE_B).commit();
+
+ Snapshot postAppend = upgradeTable.currentSnapshot();
+ List postManifests = postAppend.dataManifests(upgradeTable.io());
+ assertThat(postManifests).hasSize(3);
+
+ if (formatVersion >= 4) {
+ long appendTimestamp = postAppend.timestampMillis();
+ assertThat(postManifests.get(0).commitTimestampMs())
+ .as("New V4 manifest should inherit commit_timestamp_ms from snapshot")
+ .isEqualTo(appendTimestamp);
+ checkDataFileCommitTimestamp(upgradeTable, postManifests.get(0), appendTimestamp);
+ // reused pre-upgrade manifests should still have null commit_timestamp_ms
+ assertThat(postManifests.get(1).commitTimestampMs()).isNull();
+ assertThat(postManifests.get(2).commitTimestampMs()).isNull();
+ } else {
+ assertThat(postManifests.get(0).commitTimestampMs()).isNull();
+ assertThat(postManifests.get(1).commitTimestampMs()).isNull();
+ assertThat(postManifests.get(2).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testDeleteAssignmentAfterUpgrade(@TempDir File altLocation) {
// data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)]
testTableUpgrade(altLocation);
@@ -598,9 +709,19 @@ public void testDeleteAssignmentAfterUpgrade(@TempDir File altLocation) {
checkDataFileAssignment(upgradeTable, manifests.get(1), 0L);
// the existing manifests were reused without modification
assertThat(manifests.get(1).path()).isEqualTo(existingManifests.get(1).path());
+
+ if (formatVersion >= 4) {
+ long assignedTimestamp = assigned.timestampMillis();
+ assertThat(manifests.get(0).commitTimestampMs()).isEqualTo(assignedTimestamp);
+ // reused pre-upgrade manifest should have null commit_timestamp_ms
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+ } else {
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testBranchAssignmentAfterUpgrade(@TempDir File altLocation) {
// data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)]
testTableUpgrade(altLocation);
@@ -648,9 +769,16 @@ public void testBranchAssignmentAfterUpgrade(@TempDir File altLocation) {
// the existing manifests were reused without modification
assertThat(branchManifests.get(0).path()).isEqualTo(existingManifests.get(0).path());
assertThat(branchManifests.get(1).path()).isEqualTo(existingManifests.get(1).path());
+
+ // main manifests are unmodified pre-upgrade manifests
+ assertThat(mainManifests.get(0).commitTimestampMs()).isNull();
+ assertThat(mainManifests.get(1).commitTimestampMs()).isNull();
+ // branch reuses pre-upgrade manifests, so commit_timestamp_ms stays null
+ assertThat(branchManifests.get(0).commitTimestampMs()).isNull();
+ assertThat(branchManifests.get(1).commitTimestampMs()).isNull();
}
- @Test
+ @TestTemplate
public void testOverwriteAssignmentAfterUpgrade(@TempDir File altLocation) {
// data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)]
testTableUpgrade(altLocation);
@@ -684,9 +812,21 @@ public void testOverwriteAssignmentAfterUpgrade(@TempDir File altLocation) {
checkDataFileAssignment(upgradeTable, manifests.get(2), FILE_B.recordCount());
// the last manifest is reused without modification
assertThat(manifests.get(2).path()).isEqualTo(existingManifests.get(1).path());
+
+ if (formatVersion >= 4) {
+ long assignedTimestamp = assigned.timestampMillis();
+ assertThat(manifests.get(0).commitTimestampMs()).isEqualTo(assignedTimestamp);
+ assertThat(manifests.get(1).commitTimestampMs()).isEqualTo(assignedTimestamp);
+ // reused pre-upgrade manifest should have null commit_timestamp_ms
+ assertThat(manifests.get(2).commitTimestampMs()).isNull();
+ } else {
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
+ assertThat(manifests.get(2).commitTimestampMs()).isNull();
+ }
}
- @Test
+ @TestTemplate
public void testRowDeltaAssignmentAfterUpgrade(@TempDir File altLocation) {
// data manifests: [added(FILE_C)], [existing(FILE_A), deleted(FILE_B)]
testTableUpgrade(altLocation);
@@ -720,9 +860,13 @@ public void testRowDeltaAssignmentAfterUpgrade(@TempDir File altLocation) {
// the existing manifests were reused without modification
assertThat(manifests.get(0).path()).isEqualTo(existingManifests.get(0).path());
assertThat(manifests.get(1).path()).isEqualTo(existingManifests.get(1).path());
+
+ // reused pre-upgrade manifests should have null commit_timestamp_ms
+ assertThat(manifests.get(0).commitTimestampMs()).isNull();
+ assertThat(manifests.get(1).commitTimestampMs()).isNull();
}
- @Test
+ @TestTemplate
public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocation) {
// create a non-empty upgrade table with FILE_A
BaseTable upgradeTable =
@@ -737,7 +881,7 @@ public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocatio
.as("v2 tables should always have next-row-id=0")
.isEqualTo(0L);
- TestTables.upgrade(altLocation, "test_upgrade", 3);
+ TestTables.upgrade(altLocation, "test_upgrade", formatVersion);
upgradeTable.refresh();
assertThat(upgradeTable.operations().current().nextRowId())
@@ -760,6 +904,10 @@ public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocatio
checkDataFileAssignment(upgradeTable, preMergeManifests.get(0), (Long) null);
checkDataFileAssignment(upgradeTable, preMergeManifests.get(1), (Long) null);
+ // pre-upgrade manifests should have null commit_timestamp_ms
+ assertThat(preMergeManifests.get(0).commitTimestampMs()).isNull();
+ assertThat(preMergeManifests.get(1).commitTimestampMs()).isNull();
+
// add FILE_C and trigger metadata compaction
upgradeTable.newAppend().appendFile(FILE_C).commit();
// data manifests: [add(FILE_C), exist(FILE_B), exist(FILE_A)]
@@ -779,12 +927,62 @@ public void testUpgradeAssignmentWithManifestCompaction(@TempDir File altLocatio
0L,
FILE_C.recordCount(),
FILE_C.recordCount() + FILE_B.recordCount());
+
+ if (formatVersion >= 4) {
+ long mergedTimestamp = upgradeTable.currentSnapshot().timestampMillis();
+ assertThat(mergedManifests.get(0).commitTimestampMs()).isEqualTo(mergedTimestamp);
+ // FILE_C is ADDED with the compaction commit timestamp,
+ // FILE_B and FILE_A are EXISTING carried over from pre-upgrade manifests with null timestamps
+ checkDataFileCommitTimestamp(
+ upgradeTable, mergedManifests.get(0), mergedTimestamp, null, null);
+ } else {
+ assertThat(mergedManifests.get(0).commitTimestampMs()).isNull();
+ }
}
private static ManifestContent content(int ordinal) {
return ManifestContent.values()[ordinal];
}
+ private static void checkManifestListCommitTimestamp(InputFile in, long... expectedTimestamps) {
+ int index = 0;
+ for (ManifestFile manifest : ManifestLists.read(in)) {
+ if (manifest.content() != ManifestContent.DATA) {
+ // delete manifests do not have commit_timestamp_ms
+ continue;
+ }
+ if (index < expectedTimestamps.length) {
+ assertThat(manifest.commitTimestampMs())
+ .as("commit_timestamp_ms for data manifest (%s) should match", index)
+ .isEqualTo(expectedTimestamps[index]);
+ } else {
+ fail("No expected commit_timestamp_ms for manifest: " + manifest);
+ }
+ index += 1;
+ }
+ }
+
+ private static void checkDataFileCommitTimestamp(
+ Table table, ManifestFile manifest, Long... expectedTimestamps) {
+ int index = 0;
+ try (ManifestReader reader =
+ ManifestFiles.read(manifest, table.io(), table.specs())) {
+ for (DataFile file : reader) {
+ if (file.content() == FileContent.DATA) {
+ assertThat(index)
+ .as("More data files than expected timestamps")
+ .isLessThan(expectedTimestamps.length);
+ assertThat(file.commitTimestampMs())
+ .as("Data file (%s) commit_timestamp_ms should match", index)
+ .isEqualTo(expectedTimestamps[index]);
+ index += 1;
+ }
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private static void checkManifestListAssignment(InputFile in, Long... firstRowIds) {
try (CloseableIterable reader =
InternalData.read(FileFormat.AVRO, in)
diff --git a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java
index 4fd4f9d3614e..0408eb314d9b 100644
--- a/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestRowLineageMetadata.java
@@ -22,9 +22,12 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
@@ -386,6 +389,109 @@ public void testMetadataRewrite() {
assertThat(table.ops().current().nextRowId()).isEqualTo(60);
}
+ @TestTemplate
+ public void testCommitTimestampOnAppend() {
+ TestTables.TestTable table =
+ TestTables.create(
+ tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+
+ table.newAppend().appendFile(fileWithRows(30)).commit();
+ Snapshot snap = table.currentSnapshot();
+ ManifestFile manifest = Iterables.getOnlyElement(snap.dataManifests(table.io()));
+
+ if (formatVersion >= 4) {
+ assertThat(manifest.commitTimestampMs())
+ .as("V4 manifest should have commit_timestamp_ms matching snapshot timestamp")
+ .isEqualTo(snap.timestampMillis());
+ checkEntryCommitTimestamp(table, manifest, snap.timestampMillis());
+ } else {
+ assertThat(manifest.commitTimestampMs())
+ .as("Pre-V4 manifest should have null commit_timestamp_ms")
+ .isNull();
+ checkEntryCommitTimestamp(table, manifest, null);
+ }
+ }
+
+ @TestTemplate
+ public void testCommitTimestampOnMultipleAppends() {
+ TestTables.TestTable table =
+ TestTables.create(
+ tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+
+ table.newAppend().appendFile(fileWithRows(30)).commit();
+ long firstTimestamp = table.currentSnapshot().timestampMillis();
+
+ table.newAppend().appendFile(fileWithRows(20)).commit();
+ Snapshot second = table.currentSnapshot();
+ long secondTimestamp = second.timestampMillis();
+
+ List manifests = second.dataManifests(table.io());
+ assertThat(manifests).hasSize(2);
+
+ if (formatVersion >= 4) {
+ assertThat(secondTimestamp).isGreaterThan(firstTimestamp);
+ assertThat(manifests.get(0).commitTimestampMs()).isEqualTo(secondTimestamp);
+ assertThat(manifests.get(1).commitTimestampMs()).isEqualTo(firstTimestamp);
+ checkEntryCommitTimestamp(table, manifests.get(0), secondTimestamp);
+ checkEntryCommitTimestamp(table, manifests.get(1), firstTimestamp);
+ } else {
+ assertThat(manifests.get(0).commitTimestampMs())
+ .as("Pre-V4 manifests must not carry commit_timestamp_ms")
+ .isNull();
+ assertThat(manifests.get(1).commitTimestampMs())
+ .as("Pre-V4 manifests must not carry commit_timestamp_ms")
+ .isNull();
+ checkEntryCommitTimestamp(table, manifests.get(0), null);
+ checkEntryCommitTimestamp(table, manifests.get(1), null);
+ }
+ }
+
+ @TestTemplate
+ public void testCommitTimestampOnRewrite() {
+ TestTables.TestTable table =
+ TestTables.create(
+ tableDir, "test", TEST_SCHEMA, PartitionSpec.unpartitioned(), formatVersion);
+
+ DataFile filePart1 = fileWithRows(30);
+ DataFile filePart2 = fileWithRows(30);
+ DataFile fileCompacted = fileWithRows(60);
+
+ table.newAppend().appendFile(filePart1).appendFile(filePart2).commit();
+ table.newRewrite().deleteFile(filePart1).deleteFile(filePart2).addFile(fileCompacted).commit();
+
+ Snapshot rewrite = table.currentSnapshot();
+ List manifests = rewrite.dataManifests(table.io());
+
+ if (formatVersion >= 4) {
+ for (ManifestFile manifest : manifests) {
+ assertThat(manifest.commitTimestampMs())
+ .as("Rewrite manifest should have commit_timestamp_ms")
+ .isEqualTo(rewrite.timestampMillis());
+ }
+ } else {
+ for (ManifestFile manifest : manifests) {
+ assertThat(manifest.commitTimestampMs())
+ .as("Pre-V4 rewrite manifest must not carry commit_timestamp_ms")
+ .isNull();
+ checkEntryCommitTimestamp(table, manifest, null);
+ }
+ }
+ }
+
+ private void checkEntryCommitTimestamp(
+ Table table, ManifestFile manifest, Long expectedTimestamp) {
+ try (ManifestReader reader =
+ ManifestFiles.read(manifest, table.io(), table.specs())) {
+ for (DataFile file : reader) {
+ assertThat(file.commitTimestampMs())
+ .as("Data file commit_timestamp_ms should match expected value")
+ .isEqualTo(expectedTimestamp);
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
private final AtomicInteger fileNum = new AtomicInteger(0);
private DataFile fileWithRows(long numRows) {
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
index f16e4feca82b..e36e07bef1fa 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotJson.java
@@ -241,6 +241,7 @@ private String createManifestListWithManifestFiles(long snapshotId, Long parentS
snapshotId,
parentSnapshotId,
0,
+ 0L,
0L)) {
writer.addAll(manifests);
}
diff --git a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
index ad25a30b56e7..1a095c3567ea 100644
--- a/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
+++ b/core/src/test/java/org/apache/iceberg/TestSnapshotProducer.java
@@ -21,8 +21,12 @@
import static org.apache.iceberg.SnapshotSummary.PUBLISHED_WAP_ID_PROP;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
import java.util.List;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@@ -158,4 +162,47 @@ public void testCommitValidationWithCustomSummaryProperties() throws IOException
// Verify the table wasn't updated
assertThat(table.snapshots()).hasSize(1);
}
+
+ @TestTemplate
+ public void testSnapshotTimestampsAreMonotonicallyIncreasing() {
+ assumeThat(formatVersion)
+ .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS);
+
+ table.newFastAppend().appendFile(FILE_A).commit();
+ Snapshot first = table.currentSnapshot();
+
+ table.newFastAppend().appendFile(FILE_B).commit();
+ Snapshot second = table.currentSnapshot();
+ assertThat(second.timestampMillis())
+ .as("V4 snapshot timestamps must be strictly increasing")
+ .isGreaterThan(first.timestampMillis());
+
+ table.newFastAppend().appendFile(FILE_C).commit();
+ Snapshot third = table.currentSnapshot();
+ assertThat(third.timestampMillis())
+ .as("V4 snapshot timestamps must be strictly increasing")
+ .isGreaterThan(second.timestampMillis());
+ }
+
+ @TestTemplate
+ public void testV4LamportClockFastForwardsDriftedClock() {
+ assumeThat(formatVersion)
+ .isGreaterThanOrEqualTo(TableMetadata.MIN_FORMAT_VERSION_MONOTONIC_TIMESTAMPS);
+
+ table.newFastAppend().appendFile(FILE_A).commit();
+ long firstTs = table.currentSnapshot().timestampMillis();
+
+ // simulate clock drift: wall clock reports a time far in the past
+ long driftedTime = firstTs - 10_000;
+ Clock driftedClock = Clock.fixed(Instant.ofEpochMilli(driftedTime), ZoneOffset.UTC);
+ AppendFiles append = table.newFastAppend().appendFile(FILE_B);
+ ((SnapshotProducer>) append).setClock(driftedClock);
+ append.commit();
+
+ long secondTs = table.currentSnapshot().timestampMillis();
+ assertThat(secondTs)
+ .as(
+ "Lamport clock should fast-forward past the drifted wall clock to the last snapshot timestamp + 1 ms")
+ .isEqualTo(firstTs + 1);
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
index bddb87859377..c8b77f39fb3e 100644
--- a/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
+++ b/core/src/test/java/org/apache/iceberg/TestTableMetadata.java
@@ -1864,6 +1864,7 @@ private String createManifestListWithManifestFile(
snapshotId,
parentSnapshotId,
0,
+ 0L,
0L)) {
writer.addAll(
ImmutableList.of(
diff --git a/core/src/test/java/org/apache/iceberg/data/DataTestBase.java b/core/src/test/java/org/apache/iceberg/data/DataTestBase.java
index 5a433fe0be85..bba3c147edd1 100644
--- a/core/src/test/java/org/apache/iceberg/data/DataTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/data/DataTestBase.java
@@ -61,7 +61,9 @@ public abstract class DataTestBase {
MetadataColumns.ROW_ID.fieldId(),
FIRST_ROW_ID,
MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER.fieldId(),
- 34L);
+ 34L,
+ MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId(),
+ 1710000000000L);
protected abstract void writeAndValidate(Schema schema) throws IOException;
@@ -623,7 +625,8 @@ public void testRowLineage() throws Exception {
required(1, "id", LongType.get()),
required(2, "data", Types.StringType.get()),
MetadataColumns.ROW_ID,
- MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER);
+ MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER,
+ MetadataColumns.LAST_UPDATED_TIMESTAMP_MS);
GenericRecord record = GenericRecord.create(schema);
@@ -641,7 +644,9 @@ public void testRowLineage() throws Exception {
"_row_id",
1_000L,
"_last_updated_sequence_number",
- 33L)),
+ 33L,
+ "_last_updated_timestamp_ms",
+ 1700000000000L)),
record.copy(Map.of("id", 4L, "data", "d", "_row_id", 1_001L)),
record.copy(Map.of("id", 5L, "data", "e"))));
}
diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
index f2e2b4e7fa34..4b9ef5615d20 100644
--- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
+++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
@@ -67,6 +67,12 @@ public static void assertEquals(
expectedValue = idToConstant.get(id);
}
+ } else if (id == MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId()) {
+ expectedValue = expected.getField(expectedField.name());
+ if (expectedValue == null && idToConstant != null) {
+ expectedValue = idToConstant.get(id);
+ }
+
} else {
expectedValue = expected.getField(expectedField.name());
}
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
index 8aa9aa4779d9..e5b58ffea0a3 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
@@ -183,6 +183,16 @@ public static ParquetValueReader lastUpdated(
}
}
+ @SuppressWarnings("unchecked")
+ public static ParquetValueReader lastUpdatedTimestamp(
+ Long baseRowId, Long commitTimestampMs, ParquetValueReader> tsReader) {
+ if (commitTimestampMs != null && baseRowId != null) {
+ return new LastUpdatedTimestampReader(commitTimestampMs, (ParquetValueReader) tsReader);
+ } else {
+ return ParquetValueReaders.nulls();
+ }
+ }
+
public static ParquetValueReader uuids(ColumnDescriptor desc) {
return new UUIDReader(desc);
}
@@ -215,6 +225,10 @@ public static ParquetValueReader> replaceWithMetadataReader(
Long baseRowId = (Long) idToConstant.get(MetadataColumns.ROW_ID.fieldId());
Long fileSeqNumber = (Long) idToConstant.get(id);
return ParquetValueReaders.lastUpdated(baseRowId, fileSeqNumber, reader);
+ } else if (id == MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId()) {
+ Long baseRowId = (Long) idToConstant.get(MetadataColumns.ROW_ID.fieldId());
+ Long commitTimestampMs = (Long) idToConstant.get(id);
+ return ParquetValueReaders.lastUpdatedTimestamp(baseRowId, commitTimestampMs, reader);
} else if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
return ParquetValueReaders.constant(idToConstant.get(id), constantDL);
@@ -456,6 +470,41 @@ public void setPageSource(PageReadStore pageStore) {
}
}
+ private static class LastUpdatedTimestampReader implements ParquetValueReader {
+ private final long commitTimestampMs;
+ private final ParquetValueReader tsReader;
+
+ private LastUpdatedTimestampReader(long commitTimestampMs, ParquetValueReader tsReader) {
+ this.commitTimestampMs = commitTimestampMs;
+ this.tsReader = tsReader != null ? tsReader : nulls();
+ }
+
+ @Override
+ public Long read(Long reuse) {
+ Long rowLastUpdatedTimestamp = tsReader.read(null);
+ if (rowLastUpdatedTimestamp != null) {
+ return rowLastUpdatedTimestamp;
+ }
+
+ return commitTimestampMs;
+ }
+
+ @Override
+ public TripleIterator> column() {
+ return tsReader.column();
+ }
+
+ @Override
+ public List> columns() {
+ return tsReader.columns();
+ }
+
+ @Override
+ public void setPageSource(PageReadStore pageStore) {
+ tsReader.setPageSource(pageStore);
+ }
+ }
+
public abstract static class PrimitiveReader implements ParquetValueReader {
private final ColumnDescriptor desc;
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
index 8f50e231237c..c7a049907b07 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRowLevelOperationsWithLineage.java
@@ -78,15 +78,16 @@ record ->
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "data", Types.StringType.get()),
MetadataColumns.ROW_ID,
- MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER));
+ MetadataColumns.LAST_UPDATED_SEQUENCE_NUMBER,
+ MetadataColumns.LAST_UPDATED_TIMESTAMP_MS));
static final List INITIAL_RECORDS =
ImmutableList.of(
- createRecord(SCHEMA, 100, "a", 0L, 1L),
- createRecord(SCHEMA, 101, "b", 1L, 1L),
- createRecord(SCHEMA, 102, "c", 2L, 1L),
- createRecord(SCHEMA, 103, "d", 3L, 1L),
- createRecord(SCHEMA, 104, "e", 4L, 1L));
+ createRecord(SCHEMA, 100, "a", 0L, 1L, 0L),
+ createRecord(SCHEMA, 101, "b", 1L, 1L, 0L),
+ createRecord(SCHEMA, 102, "c", 2L, 1L, 0L),
+ createRecord(SCHEMA, 103, "d", 3L, 1L, 0L),
+ createRecord(SCHEMA, 104, "e", 4L, 1L, 0L));
@Parameters(
name =
@@ -142,6 +143,18 @@ public static Object[][] parameters() {
DISTRIBUTED,
3
},
+ {
+ "testhadoop",
+ SparkCatalog.class.getName(),
+ ImmutableMap.of("type", "hadoop"),
+ FileFormat.PARQUET,
+ true,
+ WRITE_DISTRIBUTION_MODE_HASH,
+ true,
+ null,
+ LOCAL,
+ 4
+ },
};
}
@@ -306,6 +319,7 @@ public void testMergeIntoWithOnlyMatched()
createBranchIfNeeded();
Table table = loadIcebergTable(spark, tableName);
appendUnpartitionedRecords(table, INITIAL_RECORDS);
+ long appendTimestamp = latestSnapshot(table).timestampMillis();
createOrReplaceView(
"source",
"id INT, data string",
@@ -319,7 +333,8 @@ public void testMergeIntoWithOnlyMatched()
+ " UPDATE SET t.data = s.data ",
commitTarget());
- long updateSequenceNumber = latestSnapshot(table).sequenceNumber();
+ Snapshot updateSnapshot = latestSnapshot(table);
+ long updateSequenceNumber = updateSnapshot.sequenceNumber();
assertEquals(
"Rows which are carried over or updated should have expected lineage",
ImmutableList.of(
@@ -329,6 +344,17 @@ public void testMergeIntoWithOnlyMatched()
row(103, "d", 3L, 1L),
row(104, "e", 4L, 1L)),
rowsWithLineage());
+
+ if (formatVersion >= 4) {
+ long updateTimestamp = updateSnapshot.timestampMillis();
+ assertTimestamps(
+ ImmutableList.of(
+ row(0L, appendTimestamp),
+ row(1L, updateTimestamp),
+ row(2L, updateTimestamp),
+ row(3L, appendTimestamp),
+ row(4L, appendTimestamp)));
+ }
}
@TestTemplate
@@ -352,15 +378,76 @@ public void testMergeMatchedDelete() throws NoSuchTableException, ParseException
rowsWithLineage());
}
+ @TestTemplate
+ public void testMergeWhenNotMatchedBySource()
+ throws NoSuchTableException, ParseException, IOException {
+ createAndInitTable("id INT, data STRING", null);
+ createBranchIfNeeded();
+ Table table = loadIcebergTable(spark, tableName);
+ appendUnpartitionedRecords(table, INITIAL_RECORDS);
+ createOrReplaceView(
+ "source",
+ "id INT, data STRING",
+ "{ \"id\": 101, \"data\": \"updated_b\" }\n " + "{ \"id\": 200, \"data\": \"f\" }\n");
+
+ sql(
+ "MERGE INTO %s AS t USING source AS s ON t.id == s.id"
+ + " WHEN MATCHED THEN UPDATE set t.data = s.data "
+ + "WHEN NOT MATCHED BY SOURCE THEN UPDATE set data = 'not_matched_by_source'",
+ commitTarget());
+
+ long updateSequenceNumber = latestSnapshot(table).sequenceNumber();
+
+ assertEquals(
+ "Rows which are carried over or updated should have expected lineage",
+ ImmutableList.of(
+ row(100, "not_matched_by_source", 0L, updateSequenceNumber),
+ row(101, "updated_b", 1L, updateSequenceNumber),
+ row(102, "not_matched_by_source", 2L, updateSequenceNumber),
+ row(103, "not_matched_by_source", 3L, updateSequenceNumber),
+ row(104, "not_matched_by_source", 4L, updateSequenceNumber)),
+ rowsWithLineage());
+ }
+
+ @TestTemplate
+ public void testMergeWhenNotMatchedBySourceDelete()
+ throws NoSuchTableException, ParseException, IOException {
+ createAndInitTable("id INT, data STRING", null);
+ createBranchIfNeeded();
+ Table table = loadIcebergTable(spark, tableName);
+ appendUnpartitionedRecords(table, INITIAL_RECORDS);
+ createOrReplaceView(
+ "source",
+ "id INT, data STRING",
+ "{ \"id\": 101, \"data\": \"updated_b\" }\n "
+ + "{ \"id\": 102, \"data\": \"updated_c\" }\n");
+
+ sql(
+ "MERGE INTO %s AS t USING source AS s ON t.id == s.id"
+ + " WHEN MATCHED THEN UPDATE set t.data = s.data "
+ + "WHEN NOT MATCHED BY SOURCE THEN DELETE",
+ commitTarget());
+
+ long updateSequenceNumber = latestSnapshot(table).sequenceNumber();
+ assertEquals(
+ "Rows which are carried over or updated should have expected lineage",
+ ImmutableList.of(
+ row(101, "updated_b", 1L, updateSequenceNumber),
+ row(102, "updated_c", 2L, updateSequenceNumber)),
+ rowsWithLineage());
+ }
+
@TestTemplate
public void testUpdate() throws NoSuchTableException, ParseException, IOException {
createAndInitTable("id INT, data STRING", null);
createBranchIfNeeded();
Table table = loadIcebergTable(spark, tableName);
appendUnpartitionedRecords(table, INITIAL_RECORDS);
+ long appendTimestamp = latestSnapshot(table).timestampMillis();
sql("UPDATE %s AS t set data = 'updated_b' WHERE id = 101", commitTarget());
- long updateSequenceNumber = latestSnapshot(table).sequenceNumber();
+ Snapshot updateSnapshot = latestSnapshot(table);
+ long updateSequenceNumber = updateSnapshot.sequenceNumber();
assertEquals(
"Rows which are carried over or updated should have expected lineage",
@@ -371,6 +458,17 @@ public void testUpdate() throws NoSuchTableException, ParseException, IOExceptio
row(103, "d", 3L, 1L),
row(104, "e", 4L, 1L)),
rowsWithLineage());
+
+ if (formatVersion >= 4) {
+ long updateTimestamp = updateSnapshot.timestampMillis();
+ assertTimestamps(
+ ImmutableList.of(
+ row(0L, appendTimestamp),
+ row(1L, updateTimestamp),
+ row(2L, appendTimestamp),
+ row(3L, appendTimestamp),
+ row(4L, appendTimestamp)));
+ }
}
@TestTemplate
@@ -405,7 +503,7 @@ public void testMergeWithManyRecords() throws NoSuchTableException, ParseExcepti
List initialRecords = Lists.newArrayList();
int rowId = 0;
for (int id = 100; id < startingId + numRecords; id++) {
- initialRecords.add(createRecord(SCHEMA, id, "data_" + id, rowId++, 1L));
+ initialRecords.add(createRecord(SCHEMA, id, "data_" + id, rowId++, 1L, 0L));
}
appendUnpartitionedRecords(table, initialRecords);
@@ -484,6 +582,12 @@ private List