Fehler im Shutdown-Code korrigiert: Shutdown von `EndlessConsumer` zu spät
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 2a3445c..3d154c2 100644 (file)
@@ -35,7 +35,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private long consumed = 0;
 
   private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-  private final Map<Integer, Long> offsets = new HashMap<>();
 
 
   @Override
@@ -45,13 +44,10 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
     {
       Integer partition = tp.partition();
       Long newOffset = consumer.position(tp);
-      Long oldOffset = offsets.remove(partition);
       log.info(
-          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+          "{} - removing partition: {}, offset of next message {})",
           id,
           partition,
-          newOffset - oldOffset,
-          oldOffset,
           newOffset);
       Map<String, Long> removed = seen.remove(partition);
       for (String key : removed.keySet())
@@ -247,22 +243,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   public void destroy() throws ExecutionException, InterruptedException
   {
     log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
+    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
   }
 
   public boolean running()