Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ private enum Priority {
this.priority = priority;
}
}

private enum TransactionOperation {
SEND("send"),
BEGIN_TRANSACTION("beginTransaction"),
PREPARE_TRANSACTION("prepareTransaction"),
SEND_OFFSETS_TO_TRANSACTION("sendOffsetsToTransaction");

final String displayName;

TransactionOperation(String displayName) {
this.displayName = displayName;
}

@Override
public String toString() {
return displayName;
}
}

public TransactionManager(final LogContext logContext,
final String transactionalId,
Expand Down Expand Up @@ -331,7 +349,7 @@ synchronized TransactionalRequestResult initializeTransactions(

public synchronized void beginTransaction() {
ensureTransactional();
throwIfPendingState("beginTransaction");
throwIfPendingState(TransactionOperation.BEGIN_TRANSACTION);
maybeFailWithError();
transitionTo(State.IN_TRANSACTION);
}
Expand All @@ -343,7 +361,7 @@ public synchronized void beginTransaction() {
*/
public synchronized void prepareTransaction() {
ensureTransactional();
throwIfPendingState("prepareTransaction");
throwIfPendingState(TransactionOperation.PREPARE_TRANSACTION);
maybeFailWithError();
transitionTo(State.PREPARED_TRANSACTION);
this.preparedTxnState = new ProducerIdAndEpoch(
Expand Down Expand Up @@ -406,7 +424,7 @@ private TransactionalRequestResult beginCompletingTransaction(TransactionResult
public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
final ConsumerGroupMetadata groupMetadata) {
ensureTransactional();
throwIfPendingState("sendOffsetsToTransaction");
throwIfPendingState(TransactionOperation.SEND_OFFSETS_TO_TRANSACTION);
maybeFailWithError();

if (currentState != State.IN_TRANSACTION) {
Expand Down Expand Up @@ -438,7 +456,7 @@ public synchronized TransactionalRequestResult sendOffsetsToTransaction(final Ma

public synchronized void maybeAddPartition(TopicPartition topicPartition) {
maybeFailWithError();
throwIfPendingState("send");
throwIfPendingState(TransactionOperation.SEND);

if (isTransactional()) {
if (!hasProducerId()) {
Expand Down Expand Up @@ -1248,7 +1266,7 @@ private TxnOffsetCommitHandler txnOffsetCommitHandler(TransactionalRequestResult
return new TxnOffsetCommitHandler(result, builder);
}

private void throwIfPendingState(String operation) {
private void throwIfPendingState(TransactionOperation operation) {
if (pendingTransition != null) {
if (pendingTransition.result.isAcked()) {
pendingTransition = null;
Expand Down