WIP
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 08:01:31 +0000 (10:01 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 08:01:31 +0000 (10:01 +0200)
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 9edc87b..dfb8349 100644 (file)
@@ -52,22 +52,22 @@ public class EndlessConsumer<K, V>
 
   public void start()
   {
-    if (registry.getListenerContainer(id).isChildRunning())
+    if (running())
       throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
-    log.info("{} - Starting ListenerContainer", id);
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
     errorHandler.clearState();
     registry.getListenerContainer(id).start();
   }
 
   public void stop()
   {
-    if (!registry.getListenerContainer(id).isChildRunning())
+    if (running())
       throw new IllegalStateException("Consumer instance " + id + " is not running!");
 
-    log.info("{} - Stopping ListenerContainer", id);
+    log.info("{} - Stopping", id);
     registry.getListenerContainer(id).stop();
-    log.info("{} - Stopped", id);
+    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
   public boolean running()