diff --git a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java index 60d904b042..7571a80ff5 100644 --- a/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java +++ b/ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java @@ -329,6 +329,12 @@ public static final class Headers { * chunk upload should be a part of the same session. */ public static final String SESSION = "x-ambry-session"; + /** + * The 1-based part number for a chunk in a stitched upload. When present on a chunk upload request, + * this value is captured in the signed chunk ID metadata so that downstream consumers can determine + * chunk ordering at upload time rather than waiting for the stitch request. + */ + public static final String PART_NUMBER = "x-ambry-part-number"; /** * prefix for any header to be set as user metadata for the given blob diff --git a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java index 1d45c83f7b..47cb5b0fdc 100644 --- a/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java +++ b/ambry-frontend/src/integration-test/java/com/github/ambry/frontend/FrontendIntegrationTest.java @@ -596,6 +596,9 @@ public void stitchedUploadTest() throws Exception { stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 217); idsAndContent = uploadDataChunksAndVerify(account, container, FRONTEND_CONFIG.chunkUploadMaxChunkTtlSecs, 167); stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 167); + // verify chunk uploads without part number header (backward compatibility) + idsAndContent = uploadDataChunksAndVerify(account, container, null, false, 50, 50, 17); + stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 117); } /** @@ -755,6 +758,11 @@ private static VerifiableProperties buildFrontendVProps(File trustStoreFile, boo */ private Pair, byte[]> uploadDataChunksAndVerify(Account account, Container container, Long chunkBlobTtl, int... chunkSizes) throws Exception { + return uploadDataChunksAndVerify(account, container, chunkBlobTtl, true, chunkSizes); + } + + private Pair, byte[]> uploadDataChunksAndVerify(Account account, Container container, Long chunkBlobTtl, + boolean includePartNumbers, int... chunkSizes) throws Exception { IdSigningService idSigningService = new AmbryIdSigningService(); HttpHeaders chunkUploadHeaders = new DefaultHttpHeaders(); chunkUploadHeaders.add(RestUtils.Headers.URL_TYPE, RestMethod.POST.name()); @@ -776,11 +784,17 @@ private Pair, byte[]> uploadDataChunksAndVerify(Account account, Co List signedChunkIds = new ArrayList<>(); ByteArrayOutputStream fullContentStream = new ByteArrayOutputStream(); URI uri = new URI(signedPostUrl); - for (int chunkSize : chunkSizes) { + for (int i = 0; i < chunkSizes.length; i++) { + int chunkSize = chunkSizes[i]; byte[] contentArray = TestUtils.getRandomBytes(chunkSize); ByteBuffer content = ByteBuffer.wrap(contentArray); - // Use signed URL to POST - httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), null, content); + // Use signed URL to POST, optionally with part number header (1-based) + HttpHeaders perChunkHeaders = null; + if (includePartNumbers) { + perChunkHeaders = new DefaultHttpHeaders(); + perChunkHeaders.add(RestUtils.Headers.PART_NUMBER, i + 1); + } + httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), perChunkHeaders, content); responseParts = nettyClient.sendRequest(httpRequest, null, null).get(); String signedId = verifyPostAndReturnBlobId(responseParts, chunkSize, false); assertTrue("Blob ID for chunk upload must be signed", idSigningService.isIdSigned(signedId.substring(1))); @@ -791,6 +805,13 @@ private Pair, byte[]> uploadDataChunksAndVerify(Account account, Co String blobSize = idAndMetadata.getSecond().get(RestUtils.Headers.BLOB_SIZE); assertNotNull("x-ambry-blob-size should be present in signed ID", blobSize); assertEquals("wrong size value in signed id", content.capacity(), Long.parseLong(blobSize)); + String partNum = idAndMetadata.getSecond().get(RestUtils.Headers.PART_NUMBER); + if (includePartNumbers) { + assertNotNull("x-ambry-part-number should be present in signed ID", partNum); + assertEquals("wrong part number in signed id", i + 1, Integer.parseInt(partNum)); + } else { + assertNull("x-ambry-part-number should not be present when not sent", partNum); + } HttpHeaders expectedGetHeaders = new DefaultHttpHeaders().add(chunkUploadHeaders); // Use signed ID and blob ID for GET request expectedGetHeaders.add(RestUtils.Headers.BLOB_SIZE, content.capacity()); diff --git a/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java b/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java index e6445a2740..08a8e4f1f1 100644 --- a/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java +++ b/ambry-frontend/src/main/java/com/github/ambry/frontend/PostBlobHandler.java @@ -327,10 +327,21 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio */ private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException { if (RestUtils.isChunkUpload(restRequest.getArgs())) { - Map metadata = new HashMap<>(2); + Map metadata = new HashMap<>(6); metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived())); metadata.put(RestUtils.Headers.SESSION, RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true)); + Integer partNumber = + RestUtils.getNumericalHeader(restRequest.getArgs(), RestUtils.Headers.PART_NUMBER, false, + Integer::parseInt); + if (partNumber != null) { + if (partNumber < 1) { + throw new RestServiceException( + "Invalid part number: " + partNumber + ". Part number must be >= 1", + RestServiceErrorCode.InvalidArgs); + } + metadata.put(RestUtils.Headers.PART_NUMBER, Integer.toString(partNumber)); + } metadata.put(EXPIRATION_TIME_MS_KEY, Long.toString(Utils.addSecondsToEpochTime(time.milliseconds(), blobProperties.getTimeToLiveInSeconds()))); if (blobProperties.getReservedMetadataBlobId() != null) { diff --git a/ambry-frontend/src/test/java/com/github/ambry/frontend/PostBlobHandlerTest.java b/ambry-frontend/src/test/java/com/github/ambry/frontend/PostBlobHandlerTest.java index 8e72a88f38..bbbbc00d1b 100644 --- a/ambry-frontend/src/test/java/com/github/ambry/frontend/PostBlobHandlerTest.java +++ b/ambry-frontend/src/test/java/com/github/ambry/frontend/PostBlobHandlerTest.java @@ -290,28 +290,38 @@ public void chunkUploadTest() throws Exception { idConverterFactory.translation = CONVERTED_ID; // valid request arguments doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs, - null, 1); + null, null, 1); doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1024, frontendConfig.chunkUploadInitialChunkTtlSecs, - null, 2); + null, null, 2); // blob exceeds max blob size - doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1023, 7200, + doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1023, 7200, null, routerExceptionChecker(RouterErrorCode.BlobTooLarge), 2); // no session header - doChunkUploadTest(1024, true, null, 1025, 7200, restServiceExceptionChecker(RestServiceErrorCode.MissingArgs), 2); + doChunkUploadTest(1024, true, null, 1025, 7200, null, + restServiceExceptionChecker(RestServiceErrorCode.MissingArgs), 2); // missing max blob size - doChunkUploadTest(1024, true, UUID.randomUUID().toString(), null, 7200, + doChunkUploadTest(1024, true, UUID.randomUUID().toString(), null, 7200, null, restServiceExceptionChecker(RestServiceErrorCode.MissingArgs), 2); // invalid TTL - doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, Utils.Infinite_Time, + doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, Utils.Infinite_Time, null, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 2); // TTL > default chunkUploadInitialChunkTtlSecs doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs + 1, - null, 3); + null, null, 3); // TTL > chunkUploadMaxChunkTtlSecs doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadMaxChunkTtlSecs + 1, - restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 3); + null, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 3); // ensure that the chunk upload request requirements are not enforced for non chunk uploads. - doChunkUploadTest(1024, false, null, null, Utils.Infinite_Time, null, 4); + doChunkUploadTest(1024, false, null, null, Utils.Infinite_Time, null, null, 4); + // verify that part number is captured in signed ID metadata when present + doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs, + 3, null, 4); + // invalid part number: zero + doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs, + 0, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 4); + // invalid part number: negative + doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs, + -1, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 4); } /** @@ -486,13 +496,14 @@ private void verifySuccessResponseOnTtlEnforcement(FutureResult postFuture * @param uploadSession the value for the "x-ambry-chunk-upload-session" request header, or null to not set it. * @param maxUploadSize the value for the "x-ambry-max-upload-size" request header, or null to not set it. * @param blobTtlSecs the blob TTL to use. + * @param partNumber the value for the "x-ambry-part-number" request header, or null to not set it. * @param errorChecker if non-null, expect an exception to be thrown by the post flow and verify it using this * {@link ThrowingConsumer}. * @param callCount number of times this method is called. * @throws Exception */ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String uploadSession, Integer maxUploadSize, - long blobTtlSecs, ThrowingConsumer errorChecker, int callCount) + long blobTtlSecs, Integer partNumber, ThrowingConsumer errorChecker, int callCount) throws Exception { JSONObject headers = new JSONObject(); FrontendRestRequestServiceTest.setAmbryHeadersForPut(headers, blobTtlSecs, !REF_CONTAINER.isCacheable(), SERVICE_ID, @@ -506,6 +517,9 @@ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String up if (maxUploadSize != null) { headers.put(RestUtils.Headers.MAX_UPLOAD_SIZE, maxUploadSize); } + if (partNumber != null) { + headers.put(RestUtils.Headers.PART_NUMBER, partNumber); + } if (reservedMetadataId != null) { headers.put(RestUtils.Headers.RESERVED_METADATA_ID, reservedMetadataId); } @@ -522,9 +536,12 @@ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String up assertEquals("Unexpected converted ID", CONVERTED_ID, restResponseChannel.getHeader(RestUtils.Headers.LOCATION)); Object metadata = request.getArgs().get(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY); if (chunkUpload) { - Map expectedMetadata = new HashMap<>(3); + Map expectedMetadata = new HashMap<>(6); expectedMetadata.put(RestUtils.Headers.BLOB_SIZE, Integer.toString(contentLength)); expectedMetadata.put(RestUtils.Headers.SESSION, uploadSession); + if (partNumber != null) { + expectedMetadata.put(RestUtils.Headers.PART_NUMBER, Integer.toString(partNumber)); + } expectedMetadata.put(PostBlobHandler.EXPIRATION_TIME_MS_KEY, Long.toString(Utils.addSecondsToEpochTime(creationTimeMs, blobTtlSecs))); if (reservedMetadataId != null) { @@ -606,7 +623,7 @@ private List uploadChunksViaRouter(long creationTimeMs, Container con * @return the signed ID. */ private String getSignedId(ChunkInfo chunkInfo, String uploadSession) { - Map metadata = new HashMap<>(3); + Map metadata = new HashMap<>(5); metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(chunkInfo.getChunkSizeInBytes())); metadata.put(RestUtils.Headers.SESSION, uploadSession); metadata.put(PostBlobHandler.EXPIRATION_TIME_MS_KEY, Long.toString(chunkInfo.getExpirationTimeInMs()));