NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatMessageChannel.java
index 69947a9..1925cc8 100644 (file)
@@ -17,10 +17,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.stream.IntStream;
 
 
@@ -157,7 +154,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   @Override
   public void run()
   {
-    consumer.subscribe(List.of(topic));
+    consumer.subscribe(List.of(topic), this);
 
     running = true;
 
@@ -194,6 +191,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
         running = false;
       }
     }
+
+    log.info("Exiting normally");
   }
 
   void loadMessages(ConsumerRecords<String, MessageTo> records)