WIP
authorKai Moritz <kai@juplo.de>
Mon, 23 Jan 2023 16:20:35 +0000 (17:20 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 17:52:12 +0000 (18:52 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java

index bb2e158..46802c6 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.chat.backend.domain;
 
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
index 0f39fba..fd54d34 100644 (file)
@@ -20,7 +20,7 @@ public class InMemoryChatHomeService implements ChatHomeService
       int[] ownedShards,
       Flux<ChatRoom> chatroomFlux)
   {
-    log.debug("Creating ChatHomeService");
+    log.debug("Creating InMemoryChatHomeService");
     this.chatrooms = new Map[numShards];
     Set<Integer> owned = Arrays
         .stream(ownedShards)
index 24cd021..0e18f68 100644 (file)
@@ -2,29 +2,71 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.UUID;
+import java.util.*;
 
 
+@Slf4j
 public class KafkaChatHomeService implements ChatHomeService
 {
+  private final Map<UUID, ChatRoom>[] chatrooms;
+
+
+  public KafkaChatHomeService(
+    int numShards,
+    int[] ownedShards,
+    Flux<ChatRoom> chatroomFlux)
+  {
+    log.debug("Creating ChatHomeService");
+    this.chatrooms = new Map[numShards];
+    Set<Integer> owned = Arrays
+      .stream(ownedShards)
+      .collect(
+        () -> new HashSet<>(),
+        (set, i) -> set.add(i),
+        (a, b) -> a.addAll(b));
+    for (int shard = 0; shard < numShards; shard++)
+    {
+      chatrooms[shard] = owned.contains(shard)
+        ? new HashMap<>()
+        : null;
+    }
+    chatroomFlux
+      .filter(chatRoom ->
+      {
+        if (owned.contains(chatRoom.getShard()))
+        {
+          return true;
+        }
+        else
+        {
+          log.info("Ignoring not owned chat-room {}", chatRoom);
+          return false;
+        }
+      })
+      .toStream()
+      .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
+  }
+
   @Override
   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
-    return null;
+    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+    return Mono.just(chatRoom);
   }
 
   @Override
   public Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
-    return null;
+    return Mono.justOrEmpty(chatrooms[shard].get(id));
   }
 
   @Override
   public Flux<ChatRoom> getChatRooms(int shard)
   {
-    return null;
+    return Flux.fromStream(chatrooms[shard].values().stream());
   }
 }
index 5ab23fe..024f9aa 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
 import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -9,25 +10,46 @@ import java.time.LocalDateTime;
 import java.util.LinkedHashMap;
 
 
+@Slf4j
 public class KafkaChatRoomService implements ChatRoomService
 {
-  private final LinkedHashMap<Message.MessageKey, Message> messages = null;
+  private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+  public KafkaChatRoomService(Flux<Message> messageFlux)
+  {
+    log.debug("Creating KafkaChatRoomService");
+    messages = new LinkedHashMap<>();
+    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+  }
 
   @Override
-  public Message persistMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
+  public Message persistMessage(
+    Message.MessageKey key,
+    LocalDateTime timestamp,
+    String text)
   {
-    return null;
+    Message message = new Message(key, (long)messages.size(), timestamp, text);
+    messages.put(message.getKey(), message);
+    return message;
   }
 
   @Override
   public Mono<Message> getMessage(Message.MessageKey key)
   {
-    return null;
+    return Mono.fromSupplier(() -> messages.get(key));
   }
 
   @Override
   public Flux<Message> getMessages(long first, long last)
   {
-    return null;
+    return Flux.fromStream(messages
+      .values()
+      .stream()
+      .filter(message ->
+      {
+        long serial = message.getSerialNumber();
+        return serial >= first && serial <= last;
+      }));
   }
 }