Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ default Long fileSequenceNumber() {
return null;
}

/**
* Returns the commit timestamp in milliseconds for this file.
*
* <p>The commit timestamp represents the snapshot timestamp of the commit that added the file. It
* is used for inheriting {@code _last_updated_timestamp_ms} in V4 tables.
*
* <p>This method can return null if the commit timestamp is unknown.
*/
default Long commitTimestampMs() {
return null;
}

/**
* Returns the starting row ID to assign to new rows in the data file (with _row_id set to null).
*/
Expand Down
16 changes: 14 additions & 2 deletions api/src/main/java/org/apache/iceberg/ManifestFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ public interface ManifestFile {
"first_row_id",
Types.LongType.get(),
"Starting row ID to assign to new rows in ADDED data files");
// next ID to assign: 521
Types.NestedField COMMIT_TIMESTAMP_MS =
optional(
521,
"commit_timestamp_ms",
Types.LongType.get(),
"Snapshot timestamp when the manifest was committed");
// next ID to assign: 522

Schema SCHEMA =
new Schema(
Expand All @@ -114,7 +120,8 @@ public interface ManifestFile {
DELETED_ROWS_COUNT,
PARTITION_SUMMARIES,
KEY_METADATA,
FIRST_ROW_ID);
FIRST_ROW_ID,
COMMIT_TIMESTAMP_MS);

static Schema schema() {
return SCHEMA;
Expand Down Expand Up @@ -210,6 +217,11 @@ default Long firstRowId() {
return null;
}

/** Returns the snapshot timestamp in milliseconds when the manifest was committed. */
default Long commitTimestampMs() {
return null;
}

/**
* Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
* this method to make defensive copies.
Expand Down
2 changes: 2 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ private TestHelpers() {}
IntStream.rangeClosed(2, MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
public static final List<Integer> V3_AND_ABOVE =
IntStream.rangeClosed(3, MAX_FORMAT_VERSION).boxed().collect(Collectors.toUnmodifiableList());
public static final List<Integer> V3_AND_BELOW =
IntStream.rangeClosed(1, 3).boxed().collect(Collectors.toUnmodifiableList());

/** Wait in a tight check loop until system clock is past {@code timestampMillis} */
public static long waitUntilAfter(long timestampMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,15 @@ public static VectorizedArrowReader lastUpdated(
}
}

public static VectorizedArrowReader lastUpdatedTimestamp(
Long baseRowId, Long commitTimestampMs, VectorizedArrowReader tsReader) {
if (commitTimestampMs != null && baseRowId != null) {
return new LastUpdatedTimestampVectorReader(commitTimestampMs, tsReader);
} else {
return nulls();
}
}

public static VectorizedReader<?> replaceWithMetadataReader(
Types.NestedField icebergField,
VectorizedReader<?> reader,
Expand All @@ -434,6 +443,11 @@ public static VectorizedReader<?> replaceWithMetadataReader(
Long fileSeqNumber = (Long) idToConstant.get(id);
return VectorizedArrowReader.lastUpdated(
baseRowId, fileSeqNumber, (VectorizedArrowReader) reader);
} else if (id == MetadataColumns.LAST_UPDATED_TIMESTAMP_MS.fieldId()) {
Long baseRowId = (Long) idToConstant.get(MetadataColumns.ROW_ID.fieldId());
Long commitTimestampMs = (Long) idToConstant.get(id);
return VectorizedArrowReader.lastUpdatedTimestamp(
baseRowId, commitTimestampMs, (VectorizedArrowReader) reader);
} else if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
return new ConstantVectorReader<>(icebergField, idToConstant.get(id));
Expand Down Expand Up @@ -863,6 +877,72 @@ public void close() {
}
}

private static final class LastUpdatedTimestampVectorReader extends VectorizedArrowReader {
private static final Field LAST_UPDATED_TS =
ArrowSchemaUtil.convert(MetadataColumns.LAST_UPDATED_TIMESTAMP_MS);

private final long commitTimestampMs;
private final VectorizedReader<VectorHolder> tsReader;
private NullabilityHolder nulls;

private LastUpdatedTimestampVectorReader(
long commitTimestampMs, VectorizedReader<VectorHolder> tsReader) {
this.commitTimestampMs = commitTimestampMs;
this.tsReader = tsReader == null ? nulls() : tsReader;
}

@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
FieldVector timestamps = null;
try {
VectorHolder timestampsHolder = tsReader.read(null, numValsToRead);
timestamps = timestampsHolder.vector();
ArrowVectorAccessor<?, String, ?, ?> tsAccessor =
timestamps == null ? null : ArrowVectorAccessors.getVectorAccessor(timestampsHolder);

BigIntVector lastUpdatedTimestamps = allocateBigIntVector(LAST_UPDATED_TS, numValsToRead);
ArrowBuf dataBuffer = lastUpdatedTimestamps.getDataBuffer();
for (int i = 0; i < numValsToRead; i += 1) {
long bufferOffset = (long) i * Long.BYTES;
if (tsAccessor == null || isNull(timestampsHolder, i)) {
dataBuffer.setLong(bufferOffset, commitTimestampMs);
} else {
long materializedTimestamp = tsAccessor.getLong(i);
dataBuffer.setLong(bufferOffset, materializedTimestamp);
}
}

lastUpdatedTimestamps.setValueCount(numValsToRead);
return VectorHolder.vectorHolder(
lastUpdatedTimestamps, MetadataColumns.LAST_UPDATED_TIMESTAMP_MS, nulls);
} finally {
if (timestamps != null) {
timestamps.close();
}
}
}

@Override
public void setRowGroupInfo(
PageReadStore source, Map<ColumnPath, ColumnChunkMetaData> metadata) {
tsReader.setRowGroupInfo(source, metadata);
}

@Override
public void setBatchSize(int batchSize) {
if (nulls == null || nulls.size() < batchSize) {
this.nulls = newNullabilityHolder(batchSize);
}

tsReader.setBatchSize(batchSize);
}

@Override
public void close() {
// don't close result vectors as they are not owned by readers
}
}

private static boolean isNull(VectorHolder holder, int index) {
return holder.nullabilityHolder().isNullAt(index) == 1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public void before() {
0,
1L,
0,
0L,
0L)) {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public void writeManifestFile(BenchmarkState state) throws IOException {
0,
1L,
0,
0L,
0L)) {
for (int i = 0; i < NUM_FILES; i++) {
OutputFile manifestFile =
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public PartitionData copy() {
private long fileSizeInBytes = -1L;
private Long dataSequenceNumber = null;
private Long fileSequenceNumber = null;
private Long commitTimestampMs = null;

// optional fields
private Map<Integer, Long> columnSizes = null;
Expand Down Expand Up @@ -246,6 +247,7 @@ public PartitionData copy() {
this.sortOrderId = toCopy.sortOrderId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.commitTimestampMs = toCopy.commitTimestampMs;
this.firstRowId = toCopy.firstRowId;
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
Expand Down Expand Up @@ -288,6 +290,15 @@ public void setFileSequenceNumber(Long fileSequenceNumber) {
this.fileSequenceNumber = fileSequenceNumber;
}

@Override
public Long commitTimestampMs() {
return commitTimestampMs;
}

public void setCommitTimestampMs(Long commitTimestampMs) {
this.commitTimestampMs = commitTimestampMs;
}

@Override
public Long firstRowId() {
return firstRowId;
Expand Down Expand Up @@ -626,6 +637,7 @@ public String toString() {
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("commit_timestamp_ms", commitTimestampMs == null ? "null" : commitTimestampMs)
.add("first_row_id", firstRowId == null ? "null" : firstRowId)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/Delegates.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,11 @@ public Long fileSequenceNumber() {
return wrapped.fileSequenceNumber();
}

@Override
public Long commitTimestampMs() {
return wrapped.commitTimestampMs();
}

@Override
public Long firstRowId() {
return wrapped.firstRowId();
Expand Down
54 changes: 48 additions & 6 deletions core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class GenericManifestEntry<F extends ContentFile<F>>
private Long snapshotId = null;
private Long dataSequenceNumber = null;
private Long fileSequenceNumber = null;
private Long commitTimestampMs = null;
private F file = null;

GenericManifestEntry(org.apache.avro.Schema schema) {
Expand All @@ -49,48 +50,73 @@ private GenericManifestEntry(GenericManifestEntry<F> toCopy, boolean fullCopy) {
this.snapshotId = toCopy.snapshotId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.commitTimestampMs = toCopy.commitTimestampMs;
this.file = toCopy.file().copy(fullCopy);
}

ManifestEntry<F> wrapExisting(ManifestEntry<F> entry) {
return wrapExisting(
entry.snapshotId(), entry.dataSequenceNumber(), entry.fileSequenceNumber(), entry.file());
entry.snapshotId(),
entry.dataSequenceNumber(),
entry.fileSequenceNumber(),
entry.commitTimestampMs(),
entry.file());
}

ManifestEntry<F> wrapExisting(
Long newSnapshotId, Long newDataSequenceNumber, Long newFileSequenceNumber, F newFile) {
Long newSnapshotId,
Long newDataSequenceNumber,
Long newFileSequenceNumber,
Long newCommitTimestampMs,
F newFile) {
this.status = Status.EXISTING;
this.snapshotId = newSnapshotId;
this.dataSequenceNumber = newDataSequenceNumber;
this.fileSequenceNumber = newFileSequenceNumber;
this.commitTimestampMs = newCommitTimestampMs;
this.file = newFile;
return this;
}

ManifestEntry<F> wrapAppend(Long newSnapshotId, F newFile) {
return wrapAppend(newSnapshotId, null, newFile);
return wrapAppend(newSnapshotId, null, null, newFile);
}

ManifestEntry<F> wrapAppend(Long newSnapshotId, Long newDataSequenceNumber, F newFile) {
return wrapAppend(newSnapshotId, newDataSequenceNumber, null, newFile);
}

ManifestEntry<F> wrapAppend(
Long newSnapshotId, Long newDataSequenceNumber, Long newCommitTimestampMs, F newFile) {
this.status = Status.ADDED;
this.snapshotId = newSnapshotId;
this.dataSequenceNumber = newDataSequenceNumber;
this.fileSequenceNumber = null;
this.commitTimestampMs = newCommitTimestampMs;
this.file = Delegates.suppressFirstRowId(newFile);
return this;
}

ManifestEntry<F> wrapDelete(Long newSnapshotId, ManifestEntry<F> entry) {
return wrapDelete(
newSnapshotId, entry.dataSequenceNumber(), entry.fileSequenceNumber(), entry.file());
newSnapshotId,
entry.dataSequenceNumber(),
entry.fileSequenceNumber(),
entry.commitTimestampMs(),
entry.file());
}

ManifestEntry<F> wrapDelete(
Long newSnapshotId, Long newDataSequenceNumber, Long newFileSequenceNumber, F newFile) {
Long newSnapshotId,
Long newDataSequenceNumber,
Long newFileSequenceNumber,
Long newCommitTimestampMs,
F newFile) {
this.status = Status.DELETED;
this.snapshotId = newSnapshotId;
this.dataSequenceNumber = newDataSequenceNumber;
this.fileSequenceNumber = newFileSequenceNumber;
this.commitTimestampMs = newCommitTimestampMs;
this.file = newFile;
return this;
}
Expand Down Expand Up @@ -121,6 +147,11 @@ public Long fileSequenceNumber() {
return fileSequenceNumber;
}

@Override
public Long commitTimestampMs() {
return commitTimestampMs;
}

/**
* @return a file
*/
Expand Down Expand Up @@ -154,6 +185,11 @@ public void setFileSequenceNumber(long newFileSequenceNumber) {
this.fileSequenceNumber = newFileSequenceNumber;
}

@Override
public void setCommitTimestampMs(long newCommitTimestampMs) {
this.commitTimestampMs = newCommitTimestampMs;
}

@Override
@SuppressWarnings("unchecked")
public void put(int i, Object v) {
Expand All @@ -173,6 +209,9 @@ public void put(int i, Object v) {
case 4:
this.file = (F) v;
return;
case 5:
this.commitTimestampMs = (Long) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand All @@ -196,6 +235,8 @@ public Object get(int i) {
return fileSequenceNumber;
case 4:
return file;
case 5:
return commitTimestampMs;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
Expand All @@ -213,7 +254,7 @@ public org.apache.avro.Schema getSchema() {

@Override
public int size() {
return 5;
return 6;
}

@Override
Expand All @@ -223,6 +264,7 @@ public String toString() {
.add("snapshot_id", snapshotId)
.add("data_sequence_number", dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber)
.add("commit_timestamp_ms", commitTimestampMs)
.add("file", file)
.toString();
}
Expand Down
Loading