refactor: Introduced `ConsumerTaskExecutor` -- Aligned code
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / implementation / kafka / ChatRoomChannel.java
index 7e95c64..d94bc65 100644 (file)
@@ -1,11 +1,11 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
+package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -292,7 +292,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
         chatRoomId,
         partition,
         bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
     ChatRoomData chatRoomData = new ChatRoomData(
         clock,
         service,
@@ -309,7 +309,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   {
     UUID id = chatRoomInfo.getId();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
     ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
     putChatRoom(
         chatRoomInfo.getId(),
@@ -329,8 +329,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
     ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
-    KafkaChatRoomService kafkaChatRoomService =
-        (KafkaChatRoomService) chatRoomData.getChatRoomService();
+    KafkaChatMessageService kafkaChatRoomService =
+        (KafkaChatMessageService) chatRoomData.getChatRoomService();
 
     kafkaChatRoomService.persistMessage(message);
   }
@@ -425,12 +425,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
     return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
   }
-
-  Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
-  }
 }