Skip to content

Commit 2b8b85d

Browse files
Feat/main codaxy/kafka status producer (#5)
* Make KafkaProducer static to avoid instantiating it for every message. * Make the getKafkaProducer method synchronous and type it more strongly. * apply gradlew. :conductor-kafka:spotlessApply * Do not use try with resource for the producer * Run spotlessApply, again 🙄 --------- Co-authored-by: ipejakovic <igor.pejakovic@codaxy.com>
1 parent 6341e9d commit 2b8b85d

1 file changed

Lines changed: 41 additions & 36 deletions

File tree

kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,33 @@
2828
public class KafkaEventServiceImpl implements KafkaEventService {
2929

3030
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventServiceImpl.class);
31+
private static KafkaProducer kafkaProducer;
32+
33+
private static synchronized <V> KafkaProducer<String, V> getKafkaProducer(
34+
KafkaProperties kafkaProperties) throws Exception {
35+
if (kafkaProducer == null) {
36+
Properties producerConfig = new Properties();
37+
producerConfig.put(
38+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
39+
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
40+
producerConfig.put(
41+
ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
42+
producerConfig.put(
43+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
44+
45+
KafkaJsonSerializer<V> jsonSerializer = new KafkaJsonSerializer<>();
46+
try {
47+
KafkaProducer<String, V> producer =
48+
new KafkaProducer<>(producerConfig, new StringSerializer(), jsonSerializer);
49+
return producer;
50+
} catch (Exception e) {
51+
LOGGER.error("Failed to create producer.", e);
52+
throw e;
53+
}
54+
}
55+
56+
return kafkaProducer;
57+
}
3158

3259
@Autowired private final KafkaProperties kafkaProperties;
3360

@@ -37,42 +64,20 @@ public KafkaEventServiceImpl(KafkaProperties kafkaProperties) {
3764

3865
@Override
3966
public <V> void produce(String key, V message, String topic) throws Exception {
67+
KafkaProducer<String, V> producer = getKafkaProducer(kafkaProperties);
68+
final ProducerRecord<String, V> record = new ProducerRecord<>(topic, key, message);
4069

41-
Properties producerConfig = new Properties();
42-
producerConfig.put(
43-
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
44-
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
45-
producerConfig.put(
46-
ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
47-
producerConfig.put(
48-
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
49-
50-
KafkaJsonSerializer<V> jsonSerializer = new KafkaJsonSerializer<>();
51-
52-
try (KafkaProducer<String, V> producer =
53-
new KafkaProducer<>(producerConfig, new StringSerializer(), jsonSerializer)) {
54-
55-
final ProducerRecord<String, V> record = new ProducerRecord<>(topic, key, message);
56-
57-
producer.send(
58-
record,
59-
(metadata, exception) -> {
60-
if (exception != null) {
61-
LOGGER.error(
62-
"Failed to send message to Kafka topic: {}", topic, exception);
63-
} else {
64-
LOGGER.info(
65-
"Message sent to topic: {} with offset: {}",
66-
topic,
67-
metadata.offset());
68-
}
69-
});
70-
71-
producer.flush();
72-
73-
} catch (Exception e) {
74-
LOGGER.error("Failed to create producer.", e);
75-
throw e;
76-
}
70+
producer.send(
71+
record,
72+
(metadata, exception) -> {
73+
if (exception != null) {
74+
LOGGER.error("Failed to send message to Kafka topic: {}", topic, exception);
75+
} else {
76+
LOGGER.info(
77+
"Message sent to topic: {} with offset: {}",
78+
topic,
79+
metadata.offset());
80+
}
81+
});
7782
}
7883
}

0 commit comments

Comments
 (0)