WIP
authorKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 12:13:42 +0000 (14:13 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 12:13:42 +0000 (14:13 +0200)
src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java

index fca26d5..dd00aca 100644 (file)
@@ -63,7 +63,7 @@ public class SplitterStreamProcessor implements ApplicationRunner
 
     try
     {
-      log.info("C - Subscribing to topic test");
+      log.info("Subscribing to topic {}", inputTopic);
       consumer.subscribe(
           Arrays.asList(inputTopic),
           new TransactionalConsumerRebalanceListener());
@@ -131,7 +131,9 @@ public class SplitterStreamProcessor implements ApplicationRunner
     {
       log.info("Closing consumer");
       consumer.close();
-      log.info("C - DONE!");
+      log.info("Closing producer");
+      producer.close();
+      log.info("Exiting!");
     }
   }
 
@@ -145,6 +147,9 @@ public class SplitterStreamProcessor implements ApplicationRunner
   private void commitTransaction()
   {
     log.info("Committing transaction");
+    producer.sendOffsetsToTransaction(
+        consumer.po,
+        consumer.groupMetadata());
     producer.commitTransaction();
   }