diff --git a/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java b/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java index 007920bbd3..f177aabb10 100644 --- a/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java +++ b/ambry-named-mysql/src/integration-test/java/com/github/ambry/named/MySqlNamedBlobDbIntegrationTest.java @@ -49,6 +49,8 @@ import static java.lang.Thread.*; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; +import static org.powermock.api.mockito.PowerMockito.*; /** @@ -281,9 +283,9 @@ record = new NamedBlobRecord(account.getName(), container.getName(), blobName, b namedBlobDb.get(account.getName(), container.getName(), blobName).get()); } - // fails @Test public void testDeleteWithMultipleVersions() throws Exception { + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobName = "testDeleteWithMultipleVersions-" + TestUtils.getRandomKey(10); @@ -368,6 +370,7 @@ public void testDeleteWithMultipleVersions() throws Exception { */ @Test public void testCleanupBlobsPipeline() throws Exception { + setupMockForMinStaleCount(); Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); calendar.add(Calendar.DATE, -config.staleDataRetentionDays); long staleCutoffTime = calendar.getTimeInMillis(); @@ -449,7 +452,7 @@ public void testCleanupBlobsPipeline() throws Exception { */ @Test public void testCleanupBlobStaleCase1() throws Exception { - + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobId = getBlobId(account, container); @@ -474,6 +477,7 @@ public void testCleanupBlobStaleCase1() throws Exception { */ @Test public void testCleanupBlobStaleCase2() throws Exception { + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobId = getBlobId(account, container); @@ -504,6 +508,7 @@ public void testCleanupBlobStaleCase2() throws Exception { */ @Test public void testCleanupBlobGoodCase1() throws Exception { + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobId = getBlobId(account, container); @@ -522,6 +527,7 @@ public void testCleanupBlobGoodCase1() throws Exception { */ @Test public void testCleanupBlobGoodCase2() throws Exception { + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobId = getBlobId(account, container); @@ -546,7 +552,7 @@ public void testCleanupBlobGoodCase2() throws Exception { */ @Test public void testCleanupBlobGoodCase3() throws Exception { - + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobId = getBlobId(account, container); @@ -567,6 +573,7 @@ public void testCleanupBlobGoodCase3() throws Exception { */ @Test public void testCleanupBlobGoodCase4() throws Exception { + setupMockForMinStaleCount(); Account account = accountService.getAllAccounts().iterator().next(); Container container = account.getAllContainers().iterator().next(); String blobId = getBlobId(account, container); @@ -594,6 +601,7 @@ public void testCleanupBlobGoodCase4() throws Exception { */ @Test public void testCleanupBlobGoodCase5() throws Exception { + setupMockForMinStaleCount(); Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); calendar.add(Calendar.DATE, -config.staleDataRetentionDays); long staleCutoffTime = calendar.getTimeInMillis(); @@ -624,6 +632,7 @@ public void testCleanupBlobGoodCase5() throws Exception { */ @Test public void testCleanupBlobGoodCase6() throws Exception { + setupMockForMinStaleCount(); Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC")); calendar.add(Calendar.DATE, -config.staleDataRetentionDays); long staleCutoffTime = calendar.getTimeInMillis(); @@ -670,6 +679,7 @@ public void testCleanupBlobGoodCase6() throws Exception { @Test public void testRemovesOneOlderStaleInProgressBlob() throws Exception { + setupMockForMinStaleCount(); // Arrange: create an IN_PROGRESS blob and mark it stale by updating modified_ts NamedBlobRecord record1 = createAndPutNamedBlob(getBlobIdFromService(), NamedBlobState.IN_PROGRESS, "new_cleaner"); updateModifiedTimestampByBlobName("new_cleaner", 20); @@ -686,6 +696,7 @@ public void testRemovesOneOlderStaleInProgressBlob() throws Exception { @Test public void testRemovesTwoStaleInProgressBlobs() throws Exception { + setupMockForMinStaleCount(); // Arrange: create two IN_PROGRESS blob records with different blob IDs NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.IN_PROGRESS, "new_cleaner"); time.sleep(5); @@ -702,6 +713,7 @@ public void testRemovesTwoStaleInProgressBlobs() throws Exception { @Test public void testRemovesStaleInProgressBlobsAddedBeforeAndAfter() throws Exception { + setupMockForMinStaleCount(); // Arrange: put one IN_PROGRESS record, then update its timestamp to stale (20 days ago) NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.IN_PROGRESS, "new_cleaner"); updateModifiedTimestampByBlobName("new_cleaner", 20); @@ -720,6 +732,7 @@ public void testRemovesStaleInProgressBlobsAddedBeforeAndAfter() throws Exceptio @Test public void testIgnoresReadyBlobsWhenRemovingStale() throws Exception { + setupMockForMinStaleCount(); NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.IN_PROGRESS, "new_cleaner"); updateModifiedTimestampByBlobName("new_cleaner", 20); time.sleep(5); @@ -737,6 +750,7 @@ public void testIgnoresReadyBlobsWhenRemovingStale() throws Exception { @Test public void testRemovesOneStaleBlobWithSameBlobIdDifferentStates() throws Exception { + setupMockForMinStaleCount(); String blobId = getBlobIdFromService(); NamedBlobRecord record1 = createAndPutNamedBlob(blobId, NamedBlobState.IN_PROGRESS, "new_cleaner"); time.sleep(5); @@ -752,6 +766,7 @@ public void testRemovesOneStaleBlobWithSameBlobIdDifferentStates() throws Except @Test public void testRemovesReadyAndInProgressStaleBlobs() throws Exception { + setupMockForMinStaleCount(); // Arrange: three blobs with mixed states (READY and IN_PROGRESS) NamedBlobRecord record1 = createAndPutNamedBlob("blob-id1", NamedBlobState.READY, "new_cleaner"); time.sleep(5); @@ -771,6 +786,7 @@ public void testRemovesReadyAndInProgressStaleBlobs() throws Exception { */ @Test public void testMultiBlobNamesCase1NoPagination() throws Exception { + setupMockForMinStaleCount(); String[] trackedBlobIds = new String[3]; for (int i = 0; i < 6; i++) { String blobId = "blob-id" + (i + 1); @@ -798,6 +814,7 @@ public void testMultiBlobNamesCase1NoPagination() throws Exception { */ @Test public void testMultiBlobNamesCase1PaginationEven() throws Exception { + setupMockForMinStaleCount(); for (int i = 0; i < 2486; i++) { String blobId = "blob-id" + (i + 1); NamedBlobState state = (i % 2 == 0) ? NamedBlobState.IN_PROGRESS : NamedBlobState.READY; @@ -817,6 +834,7 @@ public void testMultiBlobNamesCase1PaginationEven() throws Exception { */ @Test public void testMultiBlobNamesCase1PaginationOdd() throws Exception { + setupMockForMinStaleCount(); for (int i = 0; i < 2487; i++) { String blobId = "blob-id" + (i + 1); NamedBlobState state = (i % 2 == 0) ? NamedBlobState.IN_PROGRESS : NamedBlobState.READY; @@ -836,6 +854,7 @@ public void testMultiBlobNamesCase1PaginationOdd() throws Exception { */ @Test public void testMultiBlobNamesCase2NoPagination() throws Exception { + setupMockForMinStaleCount(); for (int i = 0; i < 6; i++) { String blobId = "blob-id" + (i + 1); String cleaner = "new_cleaner" + i; @@ -854,6 +873,7 @@ public void testMultiBlobNamesCase2NoPagination() throws Exception { */ @Test public void testMultiBlobNamesCase2Pagination() throws Exception { + setupMockForMinStaleCount(); for (int i = 0; i < 2344; i++) { String blobId = "blob-id" + (i + 1); String cleaner = "new_cleaner" + i; @@ -873,6 +893,7 @@ public void testMultiBlobNamesCase2Pagination() throws Exception { */ @Test public void testMultiBlobNamesCase3() throws Exception { + setupMockForMinStaleCount(); for (int i = 0; i < 1004; i++) { String blobId = "blob-id" + (i + 1); String cleaner = "new_cleaner" + i; @@ -999,6 +1020,7 @@ public static String base64BlobIdToHex(String base64BlobId) { * Helper method to run the for loop across containers to retrieve stale blobs */ public List getStaleBlobList() throws Exception { + setupMockForMinStaleCount(); List staleNamedBlobsList = new ArrayList<>(); Set containers = accountService.getContainersByStatus(Container.ContainerStatus.ACTIVE); @@ -1019,4 +1041,10 @@ public List getStaleBlobList() throws Exception { return staleNamedBlobsList; } + + private void setupMockForMinStaleCount() throws SQLException { + MySqlNamedBlobDb mockDb = mock(MySqlNamedBlobDb.class); + when(mockDb.checkIfValidContainerForCleaning(any(Connection.class), any(Container.class))) + .thenReturn(false); + } } diff --git a/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java b/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java index 0aef077d7c..afade4e5dc 100644 --- a/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java +++ b/ambry-named-mysql/src/main/java/com/github/ambry/named/MySqlNamedBlobDb.java @@ -70,6 +70,7 @@ public class MySqlNamedBlobDb implements NamedBlobDb { private static final Logger logger = LoggerFactory.getLogger(MySqlNamedBlobDb.class); private static final int MAX_NUMBER_OF_VERSIONS_IN_DELETE = 1000; private static final int VERSION_BASE = 100000; + private static final int MIN_STALE_BLOBS_TO_ACTIVATE_CLEANER = 10000; private final Time time; private static final String MULTI_VERSION_PLACE_HOLDER = "MULTI_VERSION_PLACE_HOLDER"; @@ -178,6 +179,20 @@ public class MySqlNamedBlobDb implements NamedBlobDb { + "ORDER BY %s ASC, %s DESC " + "LIMIT ?", ACCOUNT_ID, CONTAINER_ID, BLOB_NAME, BLOB_ID, VERSION, BLOB_STATE, MODIFIED_TS, DELETED_TS, NAMED_BLOBS_V2, BLOB_NAME, VERSION); + private static final String GET_MINIMUM_STALE_BLOB_COUNT = String.format( + "SELECT COUNT(*) AS single_occurrence_ready_count " + + "FROM ( " + + " SELECT %s " + + " FROM %s " + + " WHERE container_id = ? " + + " AND account_id = ? " + + " AND blob_state = ? " + + " GROUP BY %s, %s, %s " + + " HAVING COUNT(*) = 1 " + + ") AS sub", + BLOB_NAME, NAMED_BLOBS_V2, BLOB_NAME, CONTAINER_ID, ACCOUNT_ID + ); + private final AccountService accountService; private final String localDatacenter; private final List remoteDatacenters; @@ -384,6 +399,10 @@ public CompletableFuture pullStaleBlobs(Container return executeGenericTransactionAsync(true, (connection) -> { long startTime = this.time.milliseconds(); StaleBlobsWithLatestBlobName staleBlobsWithLatestBlobName = null; + Boolean res = checkIfValidContainerForCleaning(connection, container); + if (!(res)) { + return new StaleBlobsWithLatestBlobName(new ArrayList<>(), null); + } List potentialStaleNamedBlobResults = getAllBlobsForContainer(connection, container, blobName); int resultSize = potentialStaleNamedBlobResults.size(); if (resultSize == 0) { @@ -957,6 +976,28 @@ private List getAllBlobsForContainer(Connection connection, Cont return resultList; } + public boolean checkIfValidContainerForCleaning(Connection connection, Container container) throws SQLException { + int minStaleCount = 0; + try (PreparedStatement statement = connection.prepareStatement(GET_MINIMUM_STALE_BLOB_COUNT)) { + statement.setInt(1, container.getId()); + statement.setInt(2, container.getParentAccountId()); + statement.setInt(3, NamedBlobState.READY.ordinal()); + + logger.info("Determining the minimum number of stale blobs: Query {}", statement.toString()); + + try (ResultSet resultSet = statement.executeQuery()) { + if (resultSet.next()) { + minStaleCount = resultSet.getInt(1); // directly read the number returned by the query + } + } + } catch (SQLException e) { + logger.error("Error executing query: {}", e.getMessage()); + throw e; + } + + return minStaleCount >= MIN_STALE_BLOBS_TO_ACTIVATE_CLEANER; + } + /** * Performs a batch soft delete on the provided list of stale blobs. *