From: Kai Moritz Date: Sat, 2 Nov 2024 20:58:59 +0000 (+0100) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=3227c2f222301e671b0366f99dda839ba43c09a3;p=demos%2Fkafka%2Ftraining WIP --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3057dfa..1546f2d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -23,11 +23,11 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, - Producer kafkaProducer, ApplicationProperties properties, Clock clock, ConfigurableApplicationContext applicationContext) { + Producer kafkaProducer = kafkaProducer(properties); return new ExampleConsumer( properties.getClientId(), @@ -65,8 +65,7 @@ public class ApplicationConfiguration return new KafkaConsumer<>(props); } - @Bean - public KafkaProducer kafkaProducer(ApplicationProperties properties) + KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index a7e5bcb..b60ec37 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -169,9 +169,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } - sendOffsets(); log.info("{} - Final commit for transaction", id); producer.commitTransaction(); + log.info("{} - Closing the producer", id); + producer.close(); } private void commitIfNecessary()