NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
index ac44f99..2a34c81 100644 (file)
@@ -9,7 +9,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
 
@@ -23,8 +22,8 @@ import java.util.UUID;
 public class ChatRoomChannel implements Runnable
 {
   private final String topic;
-  private final Consumer<Integer, ChatRoomTo> consumer;
   private final Producer<Integer, ChatRoomTo> producer;
+  private final Consumer<Integer, ChatRoomTo> consumer;
   private final ShardingStrategy shardingStrategy;
   private final ChatMessageChannel chatMessageChannel;
   private final Clock clock;
@@ -93,8 +92,11 @@ public class ChatRoomChannel implements Runnable
         running = false;
       }
     }
+
+    log.info("Exiting normally");
   }
 
+
   void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
     UUID id = chatRoomInfo.getId();