WIP
authorKai Moritz <kai@juplo.de>
Sat, 2 Sep 2023 17:34:29 +0000 (19:34 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 2 Sep 2023 17:34:29 +0000 (19:34 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
deleted file mode 100644 (file)
index 29f1312..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-import java.util.stream.IntStream;
-
-
-@Slf4j
-public class InMemoryChatHomeService
-{
-  private final Map<UUID, ChatRoom>[] chatrooms;
-
-
-  public InMemoryChatHomeService(
-      int numShards,
-      int[] ownedShards,
-      Flux<ChatRoom> chatroomFlux)
-  {
-    log.debug("Creating InMemoryChatHomeService");
-    this.chatrooms = new Map[numShards];
-    Set<Integer> owned = Arrays
-        .stream(ownedShards)
-        .collect(
-            () -> new HashSet<>(),
-            (set, i) -> set.add(i),
-            (a, b) -> a.addAll(b));
-    for (int shard = 0; shard < numShards; shard++)
-    {
-      chatrooms[shard] = owned.contains(shard)
-          ? new HashMap<>()
-          : null;
-    }
-    chatroomFlux
-        .filter(chatRoom ->
-        {
-          if (owned.contains(chatRoom.getShard()))
-          {
-            return true;
-          }
-          else
-          {
-            log.info("Ignoring not owned chat-room {}", chatRoom);
-            return false;
-          }
-        })
-        .toStream()
-        .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
-  }
-
-  public void putChatRoom(ChatRoom chatRoom)
-  {
-    chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
-  }
-
-  public Mono<ChatRoom> getChatRoom(int shard, UUID id)
-  {
-    return Mono.justOrEmpty(chatrooms[shard].get(id));
-  }
-
-  public int[] getOwnedShards()
-  {
-    return IntStream
-        .range(0, chatrooms.length)
-        .filter(i -> chatrooms[i] != null)
-        .toArray();
-  }
-
-  public Flux<ChatRoom> getChatRooms(int shard)
-  {
-    return Flux.fromStream(chatrooms[shard].values().stream());
-  }
-}
index 375ed72..00e0c6f 100644 (file)
@@ -26,9 +26,9 @@ public class InMemoryServicesConfiguration
       name = "sharding-strategy",
       havingValue = "none",
       matchIfMissing = true)
-  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  ChatHome noneShardingChatHome()
   {
-    return new SimpleChatHome(chatHomeService);
+    return new SimpleChatHome();
   }
 
   @Bean
index 3048aa5..2987dd4 100644 (file)
@@ -13,13 +13,17 @@ import java.util.*;
 @Slf4j
 public class SimpleChatHome implements ChatHome
 {
-  private final int shard;
   private final Map<UUID, ChatRoom> chatrooms;
 
 
 
+  public SimpleChatHome(Flux<ChatRoom> chatroomFlux)
+  {
+    this(chatroomFlux, null);
+  }
+
   public SimpleChatHome(
-      int shard,
+      Integer shard,
       Flux<ChatRoom> chatroomFlux)
   {
     log.info("Created SimpleChatHome for shard {}", shard);
@@ -28,7 +32,7 @@ public class SimpleChatHome implements ChatHome
     chatroomFlux
         .filter(chatRoom ->
         {
-          if (shard > -1 && chatRoom.getShard() == shard)
+          if (shard == null && chatRoom.getShard() == shard)
           {
             return true;
           }
@@ -43,7 +47,6 @@ public class SimpleChatHome implements ChatHome
         })
         .toStream()
         .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
-    this.shard = shard;
   }