Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ambry-api/src/main/java/com/github/ambry/rest/RestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,12 @@ public static final class Headers {
* chunk upload should be a part of the same session.
*/
public static final String SESSION = "x-ambry-session";
/**
* The 1-based part number for a chunk in a stitched upload. When present on a chunk upload request,
* this value is captured in the signed chunk ID metadata so that downstream consumers can determine
* chunk ordering at upload time rather than waiting for the stitch request.
*/
public static final String PART_NUMBER = "x-ambry-part-number";

/**
* prefix for any header to be set as user metadata for the given blob
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,9 @@ public void stitchedUploadTest() throws Exception {
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 217);
idsAndContent = uploadDataChunksAndVerify(account, container, FRONTEND_CONFIG.chunkUploadMaxChunkTtlSecs, 167);
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 167);
// verify chunk uploads without part number header (backward compatibility)
idsAndContent = uploadDataChunksAndVerify(account, container, null, false, 50, 50, 17);
stitchBlobAndVerify(account, container, idsAndContent.getFirst(), idsAndContent.getSecond(), 117);
}

/**
Expand Down Expand Up @@ -755,6 +758,11 @@ private static VerifiableProperties buildFrontendVProps(File trustStoreFile, boo
*/
private Pair<List<String>, byte[]> uploadDataChunksAndVerify(Account account, Container container, Long chunkBlobTtl,
int... chunkSizes) throws Exception {
return uploadDataChunksAndVerify(account, container, chunkBlobTtl, true, chunkSizes);
}

private Pair<List<String>, byte[]> uploadDataChunksAndVerify(Account account, Container container, Long chunkBlobTtl,
boolean includePartNumbers, int... chunkSizes) throws Exception {
IdSigningService idSigningService = new AmbryIdSigningService();
HttpHeaders chunkUploadHeaders = new DefaultHttpHeaders();
chunkUploadHeaders.add(RestUtils.Headers.URL_TYPE, RestMethod.POST.name());
Expand All @@ -776,11 +784,17 @@ private Pair<List<String>, byte[]> uploadDataChunksAndVerify(Account account, Co
List<String> signedChunkIds = new ArrayList<>();
ByteArrayOutputStream fullContentStream = new ByteArrayOutputStream();
URI uri = new URI(signedPostUrl);
for (int chunkSize : chunkSizes) {
for (int i = 0; i < chunkSizes.length; i++) {
int chunkSize = chunkSizes[i];
byte[] contentArray = TestUtils.getRandomBytes(chunkSize);
ByteBuffer content = ByteBuffer.wrap(contentArray);
// Use signed URL to POST
httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), null, content);
// Use signed URL to POST, optionally with part number header (1-based)
HttpHeaders perChunkHeaders = null;
if (includePartNumbers) {
perChunkHeaders = new DefaultHttpHeaders();
perChunkHeaders.add(RestUtils.Headers.PART_NUMBER, i + 1);
}
httpRequest = buildRequest(HttpMethod.POST, uri.getPath() + "?" + uri.getQuery(), perChunkHeaders, content);
responseParts = nettyClient.sendRequest(httpRequest, null, null).get();
String signedId = verifyPostAndReturnBlobId(responseParts, chunkSize, false);
assertTrue("Blob ID for chunk upload must be signed", idSigningService.isIdSigned(signedId.substring(1)));
Expand All @@ -791,6 +805,13 @@ private Pair<List<String>, byte[]> uploadDataChunksAndVerify(Account account, Co
String blobSize = idAndMetadata.getSecond().get(RestUtils.Headers.BLOB_SIZE);
assertNotNull("x-ambry-blob-size should be present in signed ID", blobSize);
assertEquals("wrong size value in signed id", content.capacity(), Long.parseLong(blobSize));
String partNum = idAndMetadata.getSecond().get(RestUtils.Headers.PART_NUMBER);
if (includePartNumbers) {
assertNotNull("x-ambry-part-number should be present in signed ID", partNum);
assertEquals("wrong part number in signed id", i + 1, Integer.parseInt(partNum));
} else {
assertNull("x-ambry-part-number should not be present when not sent", partNum);
}
HttpHeaders expectedGetHeaders = new DefaultHttpHeaders().add(chunkUploadHeaders);
// Use signed ID and blob ID for GET request
expectedGetHeaders.add(RestUtils.Headers.BLOB_SIZE, content.capacity());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,21 @@ private PutBlobOptions getPutBlobOptionsFromRequest() throws RestServiceExceptio
*/
private void setSignedIdMetadataAndBlobSize(BlobProperties blobProperties) throws RestServiceException {
if (RestUtils.isChunkUpload(restRequest.getArgs())) {
Map<String, String> metadata = new HashMap<>(2);
Map<String, String> metadata = new HashMap<>(6);
metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(restRequest.getBlobBytesReceived()));
metadata.put(RestUtils.Headers.SESSION,
RestUtils.getHeader(restRequest.getArgs(), RestUtils.Headers.SESSION, true));
Integer partNumber =
RestUtils.getNumericalHeader(restRequest.getArgs(), RestUtils.Headers.PART_NUMBER, false,
Integer::parseInt);
if (partNumber != null) {
if (partNumber < 1) {
throw new RestServiceException(
"Invalid part number: " + partNumber + ". Part number must be >= 1",
RestServiceErrorCode.InvalidArgs);
}
metadata.put(RestUtils.Headers.PART_NUMBER, Integer.toString(partNumber));
}
metadata.put(EXPIRATION_TIME_MS_KEY,
Long.toString(Utils.addSecondsToEpochTime(time.milliseconds(), blobProperties.getTimeToLiveInSeconds())));
if (blobProperties.getReservedMetadataBlobId() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,28 +290,38 @@ public void chunkUploadTest() throws Exception {
idConverterFactory.translation = CONVERTED_ID;
// valid request arguments
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs,
null, 1);
null, null, 1);
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1024, frontendConfig.chunkUploadInitialChunkTtlSecs,
null, 2);
null, null, 2);
// blob exceeds max blob size
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1023, 7200,
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1023, 7200, null,
routerExceptionChecker(RouterErrorCode.BlobTooLarge), 2);
// no session header
doChunkUploadTest(1024, true, null, 1025, 7200, restServiceExceptionChecker(RestServiceErrorCode.MissingArgs), 2);
doChunkUploadTest(1024, true, null, 1025, 7200, null,
restServiceExceptionChecker(RestServiceErrorCode.MissingArgs), 2);
// missing max blob size
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), null, 7200,
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), null, 7200, null,
restServiceExceptionChecker(RestServiceErrorCode.MissingArgs), 2);
// invalid TTL
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, Utils.Infinite_Time,
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, Utils.Infinite_Time, null,
restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 2);
// TTL > default chunkUploadInitialChunkTtlSecs
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs + 1,
null, 3);
null, null, 3);
// TTL > chunkUploadMaxChunkTtlSecs
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadMaxChunkTtlSecs + 1,
restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 3);
null, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 3);
// ensure that the chunk upload request requirements are not enforced for non chunk uploads.
doChunkUploadTest(1024, false, null, null, Utils.Infinite_Time, null, 4);
doChunkUploadTest(1024, false, null, null, Utils.Infinite_Time, null, null, 4);
// verify that part number is captured in signed ID metadata when present
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs,
3, null, 4);
// invalid part number: zero
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs,
0, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 4);
// invalid part number: negative
doChunkUploadTest(1024, true, UUID.randomUUID().toString(), 1025, frontendConfig.chunkUploadInitialChunkTtlSecs,
-1, restServiceExceptionChecker(RestServiceErrorCode.InvalidArgs), 4);
}

/**
Expand Down Expand Up @@ -486,13 +496,14 @@ private void verifySuccessResponseOnTtlEnforcement(FutureResult<Void> postFuture
* @param uploadSession the value for the "x-ambry-chunk-upload-session" request header, or null to not set it.
* @param maxUploadSize the value for the "x-ambry-max-upload-size" request header, or null to not set it.
* @param blobTtlSecs the blob TTL to use.
* @param partNumber the value for the "x-ambry-part-number" request header, or null to not set it.
* @param errorChecker if non-null, expect an exception to be thrown by the post flow and verify it using this
* {@link ThrowingConsumer}.
* @param callCount number of times this method is called.
* @throws Exception
*/
private void doChunkUploadTest(int contentLength, boolean chunkUpload, String uploadSession, Integer maxUploadSize,
long blobTtlSecs, ThrowingConsumer<ExecutionException> errorChecker, int callCount)
long blobTtlSecs, Integer partNumber, ThrowingConsumer<ExecutionException> errorChecker, int callCount)
throws Exception {
JSONObject headers = new JSONObject();
FrontendRestRequestServiceTest.setAmbryHeadersForPut(headers, blobTtlSecs, !REF_CONTAINER.isCacheable(), SERVICE_ID,
Expand All @@ -506,6 +517,9 @@ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String up
if (maxUploadSize != null) {
headers.put(RestUtils.Headers.MAX_UPLOAD_SIZE, maxUploadSize);
}
if (partNumber != null) {
headers.put(RestUtils.Headers.PART_NUMBER, partNumber);
}
if (reservedMetadataId != null) {
headers.put(RestUtils.Headers.RESERVED_METADATA_ID, reservedMetadataId);
}
Expand All @@ -522,9 +536,12 @@ private void doChunkUploadTest(int contentLength, boolean chunkUpload, String up
assertEquals("Unexpected converted ID", CONVERTED_ID, restResponseChannel.getHeader(RestUtils.Headers.LOCATION));
Object metadata = request.getArgs().get(RestUtils.InternalKeys.SIGNED_ID_METADATA_KEY);
if (chunkUpload) {
Map<String, String> expectedMetadata = new HashMap<>(3);
Map<String, String> expectedMetadata = new HashMap<>(6);
expectedMetadata.put(RestUtils.Headers.BLOB_SIZE, Integer.toString(contentLength));
expectedMetadata.put(RestUtils.Headers.SESSION, uploadSession);
if (partNumber != null) {
expectedMetadata.put(RestUtils.Headers.PART_NUMBER, Integer.toString(partNumber));
}
expectedMetadata.put(PostBlobHandler.EXPIRATION_TIME_MS_KEY,
Long.toString(Utils.addSecondsToEpochTime(creationTimeMs, blobTtlSecs)));
if (reservedMetadataId != null) {
Expand Down Expand Up @@ -606,7 +623,7 @@ private List<ChunkInfo> uploadChunksViaRouter(long creationTimeMs, Container con
* @return the signed ID.
*/
private String getSignedId(ChunkInfo chunkInfo, String uploadSession) {
Map<String, String> metadata = new HashMap<>(3);
Map<String, String> metadata = new HashMap<>(5);
metadata.put(RestUtils.Headers.BLOB_SIZE, Long.toString(chunkInfo.getChunkSizeInBytes()));
metadata.put(RestUtils.Headers.SESSION, uploadSession);
metadata.put(PostBlobHandler.EXPIRATION_TIME_MS_KEY, Long.toString(chunkInfo.getExpirationTimeInMs()));
Expand Down
Loading