Limit maximum request size#15
Conversation
- sending several requests instead of single if the messages exceed AWS max size limit
| @@ -21,11 +21,15 @@ object KinesisGraphStage { | |||
| val ProvisionedThroughputExceededExceptionCode = "ProvisionedThroughputExceededException" | |||
|
|
|||
| private[KinesisGraphStage] val maxBufferSize = 500 // hard limit imposed by AWS | |||
There was a problem hiding this comment.
We call number of bytes and number of records "size" - that can be confusing, but nothing better has came to my mind yet.
| * The trick is that it then puts any failed items back into the buffer | ||
| */ | ||
| class KinesisGraphStage[A : ToPutRecordsRequest](putRecords: PutRecords, streamName: String) | ||
| class KinesisGraphStage[A : ToPutRecordsRequest](putRecords: PutRecords, streamName: String, sendingThreshold: Int) |
There was a problem hiding this comment.
I made sendingThreshold. Not sure if I should expose a setter method for it as well.
| val zero = 0 -> List.empty[PutRecordsRequestEntry] -> List.empty[List[PutRecordsRequestEntry]] | ||
| entries.foldLeft(zero) { case (((total, newList), lists), entry) => | ||
| val size = entry.getData.limit() // an approximation, as in the request it will have different size | ||
| if (total + size > maxRequestSize) |
There was a problem hiding this comment.
Ok, so here is a caveat. Size of string != size of record. We may be adding some metadata later to the message, so it can be bigger in the end? Then we shouldn't pack up to maxRequestSize of bytes, and instead like 90% of maxRequestSize? Should I just hardcode it or make it configurable?
| /** | ||
| * Split | ||
| */ | ||
| private def splitBySize(entries: List[PutRecordsRequestEntry]): List[List[PutRecordsRequestEntry]] = { |
There was a problem hiding this comment.
That will work if and only if each record size < maxRequestSize. If not, then our request will be rejected by aws anyway.
There was a problem hiding this comment.
Were you meant to say the request size < maxRequestSize
| private def splitBySize(entries: List[PutRecordsRequestEntry]): List[List[PutRecordsRequestEntry]] = { | ||
| val zero = 0 -> List.empty[PutRecordsRequestEntry] -> List.empty[List[PutRecordsRequestEntry]] | ||
| entries.foldLeft(zero) { case (((total, newList), lists), entry) => | ||
| val size = entry.getData.limit() // an approximation, as in the request it will have different size |
There was a problem hiding this comment.
also need a check for 1MB per record
| val zero = 0 -> List.empty[PutRecordsRequestEntry] -> List.empty[List[PutRecordsRequestEntry]] | ||
| entries.foldLeft(zero) { case (((total, newList), lists), entry) => | ||
| val size = entry.getData.limit() // an approximation, as in the request it will have different size | ||
| if (total + size > maxRequestSize) |
There was a problem hiding this comment.
can turn total + size into a variable, so no need to evaluate twice
We've run into issue when the single request consisting of several records exceeded AWS imposed request size limit, which is 5MB. To overcome that issue we can be dividing buffered records into several requests instead of sending just single one.
My approach won't help if a single record exceeds AWS limits.
I am making as well the buffer size configurable for the client.