Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,7 @@
import lombok.RequiredArgsConstructor;

/**
* {@code KafkaConsumerApplication} without any additional configuration options.
* {@link KafkaConsumerApplication} without any additional configuration options.
*
* @param <T> type of {@link ConsumerApp} created by this application
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -94,10 +94,9 @@ public void clean() {
}

/**
* Clear all state stores and consumer group offsets associated with the Kafka ConsumerProducer application.
* Reset consumer group offsets associated with the Kafka ConsumerProducer application.
*/
@Command(description = "Clear all state stores, consumer group offsets, and internal topics associated with the "
+ "Kafka ConsumerProducer application.")
@Command(description = "Reset consumer group offsets associated with the Kafka ConsumerProducer application.")
public void reset() {
this.prepareClean();
try (final CleanableApp<ConsumerProducerCleanUpRunner> app = this.createCleanableApp()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -29,7 +29,7 @@
import lombok.RequiredArgsConstructor;

/**
* {@code KafkaConsumerProducerApplication} without any additional configuration options.
* {@link KafkaConsumerProducerApplication} without any additional configuration options.
*
* @param <T> type of {@link ConsumerProducerApp} created by this application
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -265,7 +265,7 @@ void shouldExitWithSuccessCodeOnShutdown() {
@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
return (consumerConfig, producerConfig) -> {
try (final Producer<String, String> producer = builder.producerBuilder()
try (final Producer<String, String> producer = builder.getProducerBuilder()
.createProducer()) {
final ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(output, "foo", "bar");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -60,13 +60,13 @@ public ConsumerProducerApp createApp() {

@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
final Producer<String, String> producer = builder.producerBuilder().createProducer();
final Consumer<String, String> consumer = builder.consumerBuilder().createConsumer();
builder.consumerBuilder().subscribeToAllTopics(consumer);
final Producer<String, String> producer = builder.getProducerBuilder().createProducer();
final Consumer<String, String> consumer = builder.getConsumerBuilder().createConsumer();
builder.getConsumerBuilder().subscribeToAllTopics(consumer);
final ConsumerRunnable consumerRunnable =
builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records ->
builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records ->
records.forEach(consumerRecord ->
producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(),
producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(),
consumerRecord.key(), consumerRecord.value()))));
return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -43,13 +43,13 @@ public class Mirror implements ConsumerProducerApp {

@Override
public ConsumerProducerRunnable buildRunnable(final ConsumerProducerBuilder builder) {
final Producer<String, String> producer = builder.producerBuilder().createProducer();
final Consumer<String, String> consumer = builder.consumerBuilder().createConsumer();
builder.consumerBuilder().subscribeToAllTopics(consumer);
final Producer<String, String> producer = builder.getProducerBuilder().createProducer();
final Consumer<String, String> consumer = builder.getConsumerBuilder().createConsumer();
builder.getConsumerBuilder().subscribeToAllTopics(consumer);
final ConsumerRunnable
consumerRunnable = builder.consumerBuilder().createDefaultConsumerRunnable(consumer, records ->
consumerRunnable = builder.getConsumerBuilder().createDefaultConsumerRunnable(consumer, records ->
records.forEach(consumerRecord ->
producer.send(new ProducerRecord<>(builder.topics().getOutputTopic(),
producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(),
consumerRecord.key(), consumerRecord.value()))));
return new DefaultConsumerProducerRunnable<>(producer, consumerRunnable);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -39,7 +40,11 @@
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
Expand Down Expand Up @@ -160,6 +165,42 @@ public void deleteIfExists() {
}
}

/**
* Reset consumer group offset.
*
* @param offsetSpec specification where offsets should be reset to
*/
public void reset(final OffsetSpec offsetSpec) {
final Optional<ConsumerGroupDescription> groupDescription = this.describe();
if (groupDescription.isEmpty()) {
return;
}
if (groupDescription.get().groupState() != GroupState.EMPTY) {
throw new KafkaAdminException(
"Failed to reset offsets for consumer group %s: consumer group is not empty".formatted(
this.groupName));
}

final Map<TopicPartition, OffsetAndMetadata> groupOffsets = this.listOffsets();

final Map<TopicPartition, OffsetSpec> request = groupOffsets.keySet().stream()
.collect(Collectors.toMap(tp -> tp, tp -> offsetSpec));
final KafkaFuture<Map<TopicPartition, ListOffsetsResultInfo>> offsetsFuture =
ConsumerGroupsClient.this.adminClient.listOffsets(request).all();
final Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> offsets =
ConsumerGroupsClient.this.timeout.get(offsetsFuture,
() -> "Failed to reset offsets for consumer group %s: could not find offsets for spec %s".formatted(
this.groupName, offsetSpec));

final Map<TopicPartition, OffsetAndMetadata> resetOffsets = offsets.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> new OffsetAndMetadata(e.getValue().offset())));
final KafkaFuture<Void> alterOffsetResult =
ConsumerGroupsClient.this.adminClient.alterConsumerGroupOffsets(this.groupName, resetOffsets).all();
ConsumerGroupsClient.this.timeout.get(alterOffsetResult,
() -> "Failed to reset offsets for consumer group %s: could not alter offsets".formatted(
this.groupName));
}

/**
* Create a client for the configuration of this consumer group.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -32,4 +32,8 @@ public class KafkaAdminException extends RuntimeException {
KafkaAdminException(final String message, final Throwable cause) {
super(message, cause);
}

KafkaAdminException(final String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.common.IsolationLevel;
Expand All @@ -41,9 +43,13 @@
*
* @param <T> type of {@link ConsumerApp}
*/
public record ConfiguredConsumerApp<T extends ConsumerApp>(@NonNull T app,
@NonNull ConsumerAppConfiguration configuration)
implements ConfiguredApp<ExecutableConsumerApp<T>> {
@RequiredArgsConstructor
public class ConfiguredConsumerApp<T extends ConsumerApp> implements ConfiguredApp<ExecutableConsumerApp<T>> {
@Getter
private final @NonNull T app;
private final @NonNull ConsumerAppConfiguration configuration;

//TODO javadoc
public static Map<String, Object> createBaseConfig() {
final Map<String, Object> kafkaConfig = new HashMap<>();

Expand Down Expand Up @@ -96,8 +102,8 @@ public Map<String, Object> getKafkaProperties(final RuntimeConfiguration runtime
* Get unique group identifier of {@link ConsumerApp}
*
* @return unique group identifier
* @throws IllegalArgumentException if unique group identifier of {@link ConsumerApp} is different from
* provided group identifier in {@link ConsumerAppConfiguration}
* @throws IllegalArgumentException if unique group identifier of {@link ConsumerApp} is different from provided
* group identifier in {@link ConsumerAppConfiguration}
* @see ConsumerApp#getUniqueGroupId(ConsumerAppConfiguration)
*/
public String getUniqueGroupId() {
Expand All @@ -111,9 +117,9 @@ public String getUniqueGroupId() {
}

/**
* Create an {@code ExecutableConsumerApp} using the provided {@link RuntimeConfiguration}
* Create an {@link ExecutableConsumerApp} using the provided {@link RuntimeConfiguration}
*
* @return {@code ExecutableConsumerApp}
* @return {@link ExecutableConsumerApp}
*/
@Override
public ExecutableConsumerApp<T> withRuntimeConfiguration(final RuntimeConfiguration runtimeConfiguration) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -37,12 +37,12 @@ public interface ConsumerApp extends App<ConsumerTopicConfig, ConsumerCleanUpCon
* Create a runnable that consumes Kafka messages
*
* @param builder provides all runtime application configurations
* @return {@code ConsumerRunnable}
* @return {@link ConsumerRunnable}
*/
ConsumerRunnable buildRunnable(ConsumerBuilder builder);

/**
* @return {@code ConsumerCleanUpConfiguration}
* @return {@link ConsumerCleanUpConfiguration}
* @see ConsumerCleanUpRunner
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -25,7 +25,7 @@
package com.bakdata.kafka.consumer;

/**
* Exception thrown if running conumser application was unsuccessful
* Exception thrown if running consumer application was unsuccessful
*/
public class ConsumerApplicationException extends RuntimeException {
public ConsumerApplicationException(final String message) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
* Copyright (c) 2026 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
Expand All @@ -28,6 +28,8 @@
import com.bakdata.kafka.Configurator;
import java.util.Map;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand All @@ -39,15 +41,23 @@
*
* @see ConsumerApp#buildRunnable(ConsumerBuilder)
*/
public record ConsumerBuilder(@NonNull ConsumerTopicConfig topics, @NonNull Map<String, Object> kafkaProperties,
@NonNull ConsumerExecutionOptions executionOptions) {
@RequiredArgsConstructor
@Value
public class ConsumerBuilder {

@NonNull
ConsumerTopicConfig topics;
@NonNull
Map<String, Object> kafkaProperties;
@NonNull
ConsumerExecutionOptions executionOptions;

/**
* Create a new {@code Consumer} using {@link #kafkaProperties}
* Create a new {@link Consumer} using {@link #kafkaProperties}
*
* @param <K> type of keys
* @param <V> type of values
* @return {@code Consumer}
* @return {@link Consumer}
* @see KafkaConsumer#KafkaConsumer(Map)
*/
public <K, V> Consumer<K, V> createConsumer() {
Expand Down
Loading
Loading