refactor: `KafkaChatHomeServiceTest` reuses regular startup-logic
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChatRoomChannel.java
index 7f7d28e..d94bc65 100644 (file)
@@ -292,7 +292,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
         chatRoomId,
         partition,
         bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
     ChatRoomData chatRoomData = new ChatRoomData(
         clock,
         service,
@@ -309,7 +309,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   {
     UUID id = chatRoomInfo.getId();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
     putChatRoom(
         chatRoomInfo.getId(),
@@ -329,8 +329,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
     ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
-    KafkaChatRoomService kafkaChatRoomService =
-        (KafkaChatRoomService) chatRoomData.getChatRoomService();
+    KafkaChatMessageService kafkaChatRoomService =
+        (KafkaChatMessageService) chatRoomData.getChatRoomService();
 
     kafkaChatRoomService.persistMessage(message);
   }
@@ -425,12 +425,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
     return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
   }
-
-  Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
-  }
 }