WIP
authorKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 08:47:35 +0000 (09:47 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 24 Feb 2023 08:47:35 +0000 (09:47 +0100)
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java
src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java

index 6091c0c..955369d 100644 (file)
@@ -1,14 +1,43 @@
 package de.juplo.kafka.chat.backend.domain;
 
+import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-import java.util.UUID;
+import java.util.*;
 
 
-public interface ChatHome
+@Slf4j
+public class ChatHome implements ChatHome
 {
-  Mono<ChatRoom> getChatRoom(UUID id);
+  private final ChatHomeService service;
+  private final int shard;
 
-  Flux<ChatRoom> getChatRooms();
+
+  public ChatHome(ChatHomeService service, int shard)
+  {
+    log.info("Created SimpleChatHome for shard {}", shard);
+    this.service = service;
+    this.shard = shard;
+  }
+
+  public ChatHome(ChatHomeService service)
+  {
+    this(service, 0);
+  }
+
+
+  @Override
+  public Mono<ChatRoom> getChatRoom(UUID id)
+  {
+    return service
+        .getChatRoom(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+  }
+
+  @Override
+  public Flux<ChatRoom> getChatRooms()
+  {
+    return service.getChatRooms(shard);
+  }
 }
index 4b8c7f1..3dc6668 100644 (file)
@@ -5,6 +5,7 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -40,7 +41,24 @@ public class ShardedChatHome implements ChatHome
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
-    return chatHomes[selectShard(id)].getChatRoom(id);
+    int shard = selectShard(id);
+    if (ownedShards.contains(shard))
+    {
+      return chatHomes[shard].getChatRoom(id);
+    }
+    else
+    {
+      int[] ownedShards = new int[this.ownedShards.size()];
+      Iterator<Integer> iterator = this.ownedShards.iterator();
+      for (int i = 0; iterator.hasNext(); i++)
+      {
+        ownedShards[i] = iterator.next();
+      }
+      return Mono.error(new UnknownChatroomException(
+          id,
+          shard,
+          ownedShards));
+    }
   }
 
   @Override
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java
deleted file mode 100644 (file)
index 11542ed..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHome implements ChatHome
-{
-  private final ChatHomeService service;
-  private final int shard;
-
-
-  public SimpleChatHome(ChatHomeService service, int shard)
-  {
-    log.info("Created SimpleChatHome for shard {}", shard);
-    this.service = service;
-    this.shard = shard;
-  }
-
-  public SimpleChatHome(ChatHomeService service)
-  {
-    this(service, 0);
-  }
-
-
-  @Override
-  public Mono<ChatRoom> getChatRoom(UUID id)
-  {
-    return service
-        .getChatRoom(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
-  }
-
-  @Override
-  public Flux<ChatRoom> getChatRooms()
-  {
-    return service.getChatRooms(shard);
-  }
-}
index 1f70f11..20208e2 100644 (file)
@@ -2,17 +2,43 @@ package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
 
+import java.util.Arrays;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 
 public class UnknownChatroomException extends RuntimeException
 {
   @Getter
   private final UUID chatroomId;
+  @Getter
+  private final Optional<Integer> shard;
+  @Getter
+  private final Optional<int[]> ownedShards;
 
   public UnknownChatroomException(UUID chatroomId)
   {
     super("Chatroom does not exist: " + chatroomId);
     this.chatroomId = chatroomId;
+    this.shard = Optional.empty();
+    this.ownedShards = Optional.empty();
+  }
+
+  public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards)
+  {
+    super(
+        "Chatroom does not exist (here): " +
+        chatroomId +
+        " shard=" +
+        shard +
+        ", owned=" +
+        Arrays
+            .stream(ownedShards)
+            .mapToObj(ownedShard -> Integer.toString(ownedShard))
+            .collect(Collectors.joining(",")));
+    this.chatroomId = chatroomId;
+    this.shard = Optional.of(shard);
+    this.ownedShards = Optional.of(ownedShards);
   }
 }
index 8f262a0..c33103c 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -13,6 +14,8 @@ import java.util.*;
 public class InMemoryChatHomeService implements ChatHomeService
 {
   private final Map<UUID, ChatRoom>[] chatrooms;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
 
 
   public InMemoryChatHomeService(
@@ -67,4 +70,10 @@ public class InMemoryChatHomeService implements ChatHomeService
   {
     return Flux.fromStream(chatrooms[shard].values().stream());
   }
+
+
+  private int selectShard(UUID chatroomId)
+  {
+    return shardingStrategy.selectShard(chatroomId);
+  }
 }
index de50448..d8b49b3 100644 (file)
@@ -6,7 +6,6 @@ import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.context.annotation.Bean;
@@ -31,7 +30,7 @@ public class InMemoryServicesConfiguration
       matchIfMissing = true)
   ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
   {
-    return new SimpleChatHome(chatHomeService);
+    return new ChatHome(chatHomeService);
   }
 
   @Bean
@@ -45,14 +44,14 @@ public class InMemoryServicesConfiguration
       StorageStrategy storageStrategy)
   {
     int numShards = properties.getInmemory().getNumShards();
-    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+    ChatHome[] chatHomes = new ChatHome[numShards];
     storageStrategy
         .read()
         .subscribe(chatRoom ->
         {
           int shard = chatRoom.getShard();
           if (chatHomes[shard] == null)
-            chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
+            chatHomes[shard] = new ChatHome(chatHomeService, shard);
         });
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
     return new ShardedChatHome(chatHomes, strategy);
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java
new file mode 100644 (file)
index 0000000..8ae19fd
--- /dev/null
@@ -0,0 +1,57 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.UUID;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public class ChatHomeTest
+{
+  @Test
+  @DisplayName("Assert chatroom is delivered, if it exists")
+  void testGetExistingChatroom()
+  {
+    // Given
+    ChatHomeService chatHomeService = mock(ChatHomeService.class);
+    ChatRoom chatRoom = new ChatRoom(
+        UUID.randomUUID(),
+        "Foo",
+        0,
+        Clock.systemDefaultZone(),
+        mock(ChatRoomService.class),
+        8);
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
+    ChatHome chatHome = new ChatHome(chatHomeService);
+
+    // When
+    Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
+
+    // Then
+    assertThat(mono).emitsExactly(chatRoom);
+  }
+
+  @Test
+  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+  void testGetNonExistentChatroom()
+  {
+    // Given
+    ChatHomeService chatHomeService = mock(ChatHomeService.class);
+    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
+    ChatHome chatHome = new ChatHome(chatHomeService);
+
+    // When
+    Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
+
+    // Then
+    assertThat(mono).sendsError();
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java
deleted file mode 100644 (file)
index 5b53607..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public class SimpleChatHomeTest
-{
-  @Test
-  @DisplayName("Assert chatroom is delivered, if it exists")
-  void testGetExistingChatroom()
-  {
-    // Given
-    ChatHomeService chatHomeService = mock(ChatHomeService.class);
-    ChatRoom chatRoom = new ChatRoom(
-        UUID.randomUUID(),
-        "Foo",
-        0,
-        Clock.systemDefaultZone(),
-        mock(ChatRoomService.class),
-        8);
-    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
-    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
-    // When
-    Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
-
-    // Then
-    assertThat(mono).emitsExactly(chatRoom);
-  }
-
-  @Test
-  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
-  void testGetNonExistentChatroom()
-  {
-    // Given
-    ChatHomeService chatHomeService = mock(ChatHomeService.class);
-    when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
-    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
-
-    // When
-    Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
-
-    // Then
-    assertThat(mono).sendsError();
-  }
-}
index 3ce527e..83e905b 100644 (file)
@@ -23,7 +23,7 @@ public abstract class AbstractStorageStrategyIT
   protected void start()
   {
     StorageStrategyITConfig config = getConfig();
-    chathome = new SimpleChatHome(config.getChatHomeService());
+    chathome = new ChatHome(config.getChatHomeService());
     chatRoomFactory = config.getChatRoomFactory();
   }