NEU
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / ChatRoomChannel.java
index f9568e7..97ee988 100644 (file)
@@ -9,7 +9,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Mono;
 
@@ -38,7 +37,7 @@ public class ChatRoomChannel implements Runnable
       String name)
   {
     int shard = this.shardingStrategy.selectShard(chatRoomId);
-    ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId, name, shard);
+    ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
     return Mono.create(sink ->
     {
       ProducerRecord<Integer, ChatRoomTo> record =
@@ -93,8 +92,11 @@ public class ChatRoomChannel implements Runnable
         running = false;
       }
     }
+
+    log.info("Exiting normally");
   }
 
+
   void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
     UUID id = chatRoomInfo.getId();