diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java index 2ffeff2e59..ffd3e66e7c 100644 --- a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java @@ -28,6 +28,33 @@ public class KafkaEventServiceImpl implements KafkaEventService { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventServiceImpl.class); + private static KafkaProducer kafkaProducer; + + private static synchronized KafkaProducer getKafkaProducer( + KafkaProperties kafkaProperties) throws Exception { + if (kafkaProducer == null) { + Properties producerConfig = new Properties(); + producerConfig.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put( + ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); + producerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + KafkaJsonSerializer jsonSerializer = new KafkaJsonSerializer<>(); + try { + KafkaProducer producer = + new KafkaProducer<>(producerConfig, new StringSerializer(), jsonSerializer); + return producer; + } catch (Exception e) { + LOGGER.error("Failed to create producer.", e); + throw e; + } + } + + return kafkaProducer; + } @Autowired private final KafkaProperties kafkaProperties; @@ -37,42 +64,20 @@ public KafkaEventServiceImpl(KafkaProperties kafkaProperties) { @Override public void produce(String key, V message, String topic) throws Exception { + KafkaProducer producer = getKafkaProducer(kafkaProperties); + final ProducerRecord record = new ProducerRecord<>(topic, key, message); - Properties producerConfig = new Properties(); - producerConfig.put( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); - producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); - producerConfig.put( - ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); - producerConfig.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - - KafkaJsonSerializer jsonSerializer = new KafkaJsonSerializer<>(); - - try (KafkaProducer producer = - new KafkaProducer<>(producerConfig, new StringSerializer(), jsonSerializer)) { - - final ProducerRecord record = new ProducerRecord<>(topic, key, message); - - producer.send( - record, - (metadata, exception) -> { - if (exception != null) { - LOGGER.error( - "Failed to send message to Kafka topic: {}", topic, exception); - } else { - LOGGER.info( - "Message sent to topic: {} with offset: {}", - topic, - metadata.offset()); - } - }); - - producer.flush(); - - } catch (Exception e) { - LOGGER.error("Failed to create producer.", e); - throw e; - } + producer.send( + record, + (metadata, exception) -> { + if (exception != null) { + LOGGER.error("Failed to send message to Kafka topic: {}", topic, exception); + } else { + LOGGER.info( + "Message sent to topic: {} with offset: {}", + topic, + metadata.offset()); + } + }); } }