Skip to content

Commit 95a8ab0

Browse files
committed
MINOR: Lazily evaluate retention size during remote log cleanup
1 parent 7ff4835 commit 95a8ab0

File tree

2 files changed

+96
-7
lines changed

2 files changed

+96
-7
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,7 +1303,7 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE
13031303
long logStartOffset = log.logStartOffset();
13041304
long logEndOffset = log.logEndOffset();
13051305
Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize,
1306-
log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets);
1306+
log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets, remoteLogSizeBytes);
13071307
Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs);
13081308

13091309
RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData);
@@ -1438,11 +1438,12 @@ private Optional<RetentionTimeData> buildRetentionTimeData(long retentionMs) {
14381438
: Optional.empty();
14391439
}
14401440

1441-
private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
1442-
long onlyLocalLogSegmentsSize,
1443-
long logEndOffset,
1444-
NavigableMap<Integer, Long> epochEntries) throws RemoteStorageException {
1445-
if (retentionSize > -1) {
1441+
Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize,
1442+
long onlyLocalLogSegmentsSize,
1443+
long logEndOffset,
1444+
NavigableMap<Integer, Long> epochEntries,
1445+
long fullRemoteLogSizeBytes) throws RemoteStorageException {
1446+
if (retentionSize > -1 && (fullRemoteLogSizeBytes + onlyLocalLogSegmentsSize > retentionSize)) {
14461447
long startTimeMs = time.milliseconds();
14471448
long remoteLogSizeBytes = 0L;
14481449
Set<RemoteLogSegmentId> visitedSegmentIds = new HashSet<>();
@@ -2185,6 +2186,14 @@ public RetentionSizeData(long retentionSize, long remainingBreachedSize) {
21852186
this.retentionSize = retentionSize;
21862187
this.remainingBreachedSize = remainingBreachedSize;
21872188
}
2189+
2190+
long retentionSize() {
2191+
return retentionSize;
2192+
}
2193+
2194+
long remainingBreachedSize() {
2195+
return remainingBreachedSize;
2196+
}
21882197
}
21892198

21902199
// Visible for testing

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1139,7 +1139,7 @@ void testRemoteLogManagerRemoteMetrics() throws Exception {
11391139
when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
11401140
when(mockLog.lastStableOffset()).thenReturn(250L);
11411141
Map<String, Long> logProps = new HashMap<>();
1142-
logProps.put("retention.bytes", 1000000L);
1142+
logProps.put("retention.bytes", 5000L);
11431143
logProps.put("retention.ms", -1L);
11441144
LogConfig logConfig = new LogConfig(logProps);
11451145
when(mockLog.config()).thenReturn(logConfig);
@@ -2177,6 +2177,86 @@ public void testRemoteSizeData() {
21772177
}
21782178
}
21792179

2180+
@Test
2181+
public void testLazyBuildRetentionSizeData() throws RemoteStorageException {
2182+
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
2183+
2184+
NavigableMap<Integer, Long> epochEntries = new TreeMap<>();
2185+
epochEntries.put(0, 0L);
2186+
epochEntries.put(1, 100L);
2187+
2188+
long retentionSize = 2000L;
2189+
long onlyLocalLogSegmentsSize = 1000L;
2190+
long logEndOffset = 200L;
2191+
2192+
assertFalse(task.buildRetentionSizeData(-1L, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 1000L).isPresent());
2193+
assertFalse(task.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 500L).isPresent());
2194+
assertFalse(task.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 1000L).isPresent());
2195+
verify(remoteLogMetadataManager, times(0)).listRemoteLogSegments(eq(leaderTopicIdPartition), anyInt());
2196+
}
2197+
2198+
@Test
2199+
public void testBuildRetentionSizeData() throws RemoteStorageException {
2200+
RemoteLogManager.RLMExpirationTask task = remoteLogManager.new RLMExpirationTask(leaderTopicIdPartition);
2201+
2202+
NavigableMap<Integer, Long> epochEntries = new TreeMap<>();
2203+
epochEntries.put(0, 0L);
2204+
epochEntries.put(1, 100L);
2205+
2206+
long retentionSize = 2000L;
2207+
long onlyLocalLogSegmentsSize = 1000L;
2208+
long logEndOffset = 200L;
2209+
2210+
// 1. Total size (fullRemoteLogSizeBytes + onlyLocalLogSegmentsSize) <= retentionSize
2211+
// 1000 + 1000 <= 2000
2212+
assertFalse(task.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 1000L).isPresent());
2213+
2214+
// 2. Total size > retentionSize
2215+
// fullRemoteLogSizeBytes + onlyLocalLogSegmentsSize = 2000 + 1000 = 3000 > 2000
2216+
// We will mock segments so that the calculated remoteLogSizeBytes is 1500.
2217+
// totalSize = 1000 + 1500 = 2500.
2218+
// remainingBreachedSize = 2500 - 2000 = 500.
2219+
2220+
RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
2221+
RemoteLogSegmentMetadata segmentMetadata1 = new RemoteLogSegmentMetadata(segmentId1, 0L, 99L,
2222+
time.milliseconds(), brokerId, time.milliseconds(), 500, Optional.empty(),
2223+
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, new TreeMap<>(Collections.singletonMap(0, 0L)));
2224+
2225+
RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
2226+
RemoteLogSegmentMetadata segmentMetadata2 = new RemoteLogSegmentMetadata(segmentId2, 100L, 199L,
2227+
time.milliseconds(), brokerId, time.milliseconds(), 1000, Optional.empty(),
2228+
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, new TreeMap<>(Collections.singletonMap(1, 100L)));
2229+
2230+
// Segment in START state should be ignored
2231+
RemoteLogSegmentId segmentId3 = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid());
2232+
RemoteLogSegmentMetadata segmentMetadata3 = new RemoteLogSegmentMetadata(segmentId3, 200L, 299L,
2233+
time.milliseconds(), brokerId, time.milliseconds(), 500, Optional.empty(),
2234+
RemoteLogSegmentState.COPY_SEGMENT_STARTED, new TreeMap<>(Collections.singletonMap(1, 200L)));
2235+
2236+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
2237+
.thenReturn(Collections.singletonList(segmentMetadata1).iterator());
2238+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1))
2239+
.thenReturn(Arrays.asList(segmentMetadata2, segmentMetadata3).iterator());
2240+
2241+
Optional<RemoteLogManager.RetentionSizeData> result = task.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 2000L);
2242+
verify(remoteLogMetadataManager).listRemoteLogSegments(leaderTopicIdPartition, 0);
2243+
verify(remoteLogMetadataManager).listRemoteLogSegments(leaderTopicIdPartition, 1);
2244+
assertTrue(result.isPresent());
2245+
assertEquals(retentionSize, result.get().retentionSize());
2246+
assertEquals(500L, result.get().remainingBreachedSize());
2247+
2248+
// 3. Test deduplication: same segmentId1 is in different epochs
2249+
// (Though technically this shouldn't happen with correct metadata, the code handles it)
2250+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 0))
2251+
.thenReturn(Collections.singletonList(segmentMetadata1).iterator());
2252+
when(remoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition, 1))
2253+
.thenReturn(Arrays.asList(segmentMetadata1, segmentMetadata2, segmentMetadata3).iterator());
2254+
2255+
result = task.buildRetentionSizeData(retentionSize, onlyLocalLogSegmentsSize, logEndOffset, epochEntries, 1100L);
2256+
assertTrue(result.isPresent());
2257+
assertEquals(500L, result.get().remainingBreachedSize()); // Still 500 because segmentMetadata1 is only counted once
2258+
}
2259+
21802260
@SuppressWarnings("unchecked")
21812261
@Test
21822262
public void testRemoteSizeTime() {

0 commit comments

Comments
 (0)