WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 20:58:59 +0000 (21:58 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 15:49:53 +0000 (16:49 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 3057dfa..1546f2d 100644 (file)
@@ -23,11 +23,11 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer exampleConsumer(
       Consumer<String, String> kafkaConsumer,
-      Producer<String, String> kafkaProducer,
       ApplicationProperties properties,
       Clock clock,
       ConfigurableApplicationContext applicationContext)
   {
+    Producer<String, String> kafkaProducer = kafkaProducer(properties);
     return
         new ExampleConsumer(
             properties.getClientId(),
@@ -65,8 +65,7 @@ public class ApplicationConfiguration
     return new KafkaConsumer<>(props);
   }
 
-  @Bean
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
index a7e5bcb..b60ec37 100644 (file)
@@ -169,9 +169,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
 
-    sendOffsets();
     log.info("{} - Final commit for transaction", id);
     producer.commitTransaction();
+    log.info("{} - Closing the producer", id);
+    producer.close();
   }
 
   private void commitIfNecessary()