From 3227c2f222301e671b0366f99dda839ba43c09a3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 21:58:59 +0100 Subject: [PATCH] WIP --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 5 ++--- src/main/java/de/juplo/kafka/ExampleConsumer.java | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3057dfa..1546f2d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -23,11 +23,11 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - Producer kafkaProducer, ApplicationProperties properties, Clock clock, ConfigurableApplicationContext applicationContext) { + Producer kafkaProducer = kafkaProducer(properties); return new ExampleConsumer( properties.getClientId(), @@ -65,8 +65,7 @@ public class ApplicationConfiguration return new KafkaConsumer<>(props); } - @Bean - public KafkaProducer kafkaProducer(ApplicationProperties properties) + KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a7e5bcb..b60ec37 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -169,9 +169,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } - sendOffsets(); log.info("{} - Final commit for transaction", id); producer.commitTransaction(); + log.info("{} - Closing the producer", id); + producer.close(); } private void commitIfNecessary() -- 2.20.1