]> juplo.de Git - demos/kafka/training/commitdiff
Fix: `close()` muss noch vom `ExampleConsumer` aufgerufen werden
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 21:23:16 +0000 (22:23 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 9 Nov 2024 18:20:30 +0000 (19:20 +0100)
* Der Aufruf von `close()` löst die Abmeldung der Instanz bei dem
  GroupCoordinator aus.
* Dieser Vorgang sollte noch unter der Kontrolle des Anwendungscodes
  erfolgen!
* Wenn die Methode erst von Spring aufgerufen wird, werden dann ggf. noch
  Seiteneffekte ausgelöst, die dann noch im Kontext der Instanz laufen,
  obwohl diese eigentlich schon beendet wurde!

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 116d63d3c56bbdd523106ba55a4c75c13ba54415..0046f8f98f01a64a1709cdda5cbb298ca2816b60 100644 (file)
@@ -27,7 +27,7 @@ public class ApplicationConfiguration
             kafkaConsumer);
   }
 
-  @Bean
+  @Bean(destroyMethod = "")
   public KafkaConsumer<String, String> kafkaConsumer(ApplicationProperties properties)
   {
     Properties props = new Properties();
index 13431d32ddbd0fc0c485841b20cd124103286ff8..320606778e0aad238df60441a0516b41f911e45d 100644 (file)
@@ -73,6 +73,8 @@ public class ExampleConsumer implements Runnable
     }
     finally
     {
+      log.info("{} - Closing the KafkaConsumer", id);
+      consumer.close();
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
   }