refactor: Pulled business-logic into class `ShardedChatHome`
authorKai Moritz <kai@juplo.de>
Sat, 14 Jan 2023 15:47:04 +0000 (16:47 +0100)
committerKai Moritz <kai@juplo.de>
Wed, 25 Jan 2023 21:00:58 +0000 (22:00 +0100)
- Pulled the logic that selects the appropriate shard from the class
  `ChatBackendController` into the newly introduced class
  `ShardedChatHome`.
- Simplified the configuration
  - `InMemoryServicesConfiguration` creates a `ChatHome` of type
    `SimpleChatHome`, if the `sharding-strategy` `none` is choosen.
  - In that case, the values for `num-shards` and `owned-shards` are
    ignored and set to `1` and `0`.
  - If the `sharding-strategy` is set to `kafkalike`, a `ChatHome` of
    type `ShardedChatHome` is instanciated and the configuration
    respects the configured sharding.
- Simplified the configuration of `ChatBackendControllerTest` accordingly.

19 files changed:
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java
src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java
src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java

index 9be7fa1..36bec48 100644 (file)
@@ -17,8 +17,7 @@ import java.util.UUID;
 @RequiredArgsConstructor
 public class ChatBackendController
 {
-  private final ChatHome[] chatHomes;
-  private final ShardingStrategy selectionStrategy;
+  private final ChatHome chatHome;
   private final ChatRoomFactory factory;
   private final StorageStrategy storageStrategy;
 
@@ -29,23 +28,22 @@ public class ChatBackendController
     UUID chatRoomId = UUID.randomUUID();
     return factory
         .createChatRoom(chatRoomId, name)
-        .flatMap(chatRoom -> chatHomes[chatRoom.getShard()].putChatRoom(chatRoom))
+        .flatMap(chatRoom -> chatHome.putChatRoom(chatRoom))
         .map(ChatRoomTo::from);
   }
 
   @GetMapping("list")
   public Flux<ChatRoomTo> list()
   {
-    return Flux
-        .fromArray(chatHomes)
-        .flatMap(chatHome -> chatHome.getChatRooms())
+    return chatHome
+        .getChatRooms()
         .map(chatroom -> ChatRoomTo.from(chatroom));
   }
 
   @GetMapping("{chatroomId}/list")
   public Flux<MessageTo> list(@PathVariable UUID chatroomId)
   {
-    return chatHomes[selectShard(chatroomId)]
+    return chatHome
         .getChatRoom(chatroomId)
         .flatMapMany(chatroom -> chatroom
             .getMessages()
@@ -55,7 +53,7 @@ public class ChatBackendController
   @GetMapping("{chatroomId}")
   public Mono<ChatRoomTo> get(@PathVariable UUID chatroomId)
   {
-    return chatHomes[selectShard(chatroomId)]
+    return chatHome
         .getChatRoom(chatroomId)
         .map(chatroom -> ChatRoomTo.from(chatroom));
   }
@@ -68,7 +66,7 @@ public class ChatBackendController
       @RequestBody String text)
   {
     return
-        chatHomes[selectShard(chatroomId)]
+        chatHome
             .getChatRoom(chatroomId)
             .flatMap(chatroom -> put(chatroom, username, messageId, text));
   }
@@ -95,7 +93,7 @@ public class ChatBackendController
       @PathVariable Long messageId)
   {
     return
-        chatHomes[selectShard(chatroomId)]
+        chatHome
             .getChatRoom(chatroomId)
             .flatMap(chatroom -> get(chatroom, username, messageId));
   }
@@ -114,7 +112,7 @@ public class ChatBackendController
   @GetMapping(path = "{chatroomId}/listen")
   public Flux<ServerSentEvent<MessageTo>> listen(@PathVariable UUID chatroomId)
   {
-    return chatHomes[selectShard(chatroomId)]
+    return chatHome
         .getChatRoom(chatroomId)
         .flatMapMany(chatroom -> listen(chatroom));
   }
@@ -136,12 +134,6 @@ public class ChatBackendController
   @PostMapping("/store")
   public void store()
   {
-    for (int shard = 0; shard < chatHomes.length; shard++)
-      storageStrategy.write(chatHomes[shard].getChatRooms());
-  }
-
-  private int selectShard(UUID chatroomId)
-  {
-    return selectionStrategy.selectShard(chatroomId);
+    storageStrategy.write(chatHome.getChatRooms());
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java
deleted file mode 100644 (file)
index d06ee58..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-package de.juplo.kafka.chat.backend.api;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-public class KafkaLikeShardingStrategy implements ShardingStrategy
-{
-  private final int numPartitions;
-
-  @Override
-  public int selectShard(UUID chatRoomId)
-  {
-    byte[] serializedKey = chatRoomId.toString().getBytes();
-    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/api/ShardingStrategy.java
deleted file mode 100644 (file)
index 36f7e23..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka.chat.backend.api;
-
-import java.util.UUID;
-
-
-public interface ShardingStrategy
-{
-  int selectShard(UUID chatRoomId);
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java
new file mode 100644 (file)
index 0000000..ffa7860
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import lombok.RequiredArgsConstructor;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class ShardedChatHome implements ChatHome
+{
+  private final ChatHome[] chatHomes;
+  private final ShardingStrategy selectionStrategy;
+
+
+  @Override
+  public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
+  {
+    return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom);
+  }
+
+  @Override
+  public Mono<ChatRoom> getChatRoom(UUID id)
+  {
+    return chatHomes[selectShard(id)].getChatRoom(id);
+  }
+
+  @Override
+  public Flux<ChatRoom> getChatRooms()
+  {
+    return Flux
+        .fromArray(chatHomes)
+        .flatMap(chatHome -> chatHome.getChatRooms());
+  }
+
+
+  private int selectShard(UUID chatroomId)
+  {
+    return selectionStrategy.selectShard(chatroomId);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java
new file mode 100644 (file)
index 0000000..dde0e54
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import java.util.UUID;
+
+
+public interface ShardingStrategy
+{
+  int selectShard(UUID chatRoomId);
+}
index daa710b..b15eab3 100644 (file)
@@ -15,6 +15,13 @@ public class SimpleChatHome implements ChatHome
   private final ChatHomeService service;
   private final int shard;
 
+
+  public SimpleChatHome(ChatHomeService service)
+  {
+    this(service, 0);
+  }
+
+
   @Override
   public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
   {
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java
new file mode 100644 (file)
index 0000000..41fd9cd
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+  private final int numPartitions;
+
+  @Override
+  public int selectShard(UUID chatRoomId)
+  {
+    byte[] serializedKey = chatRoomId.toString().getBytes();
+    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+  }
+}
index 50fa705..7fff359 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.domain.ChatRoomService;
index a0f37f0..96ef05c 100644 (file)
@@ -1,8 +1,10 @@
 package de.juplo.kafka.chat.backend.persistence.inmemory;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType;
+import de.juplo.kafka.chat.backend.domain.ShardedChatHome;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.SimpleChatHome;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
@@ -22,12 +24,28 @@ import java.time.Clock;
 public class InMemoryServicesConfiguration
 {
   @Bean
-  ChatHome[] chatHomes(
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService)
+  {
+    return new SimpleChatHome(chatHomeService);
+  }
+
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  ChatHome kafkalikeShardingChatHome(
       ChatBackendProperties properties,
       InMemoryChatHomeService chatHomeService,
       StorageStrategy storageStrategy)
   {
-    SimpleChatHome[] chatHomes = new SimpleChatHome[properties.getInmemory().getNumShards()];
+    int numShards = properties.getInmemory().getNumShards();
+    SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
     storageStrategy
         .read()
         .subscribe(chatRoom ->
@@ -36,7 +54,8 @@ public class InMemoryServicesConfiguration
           if (chatHomes[shard] == null)
             chatHomes[shard] = new SimpleChatHome(chatHomeService, shard);
         });
-    return chatHomes;
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHome(chatHomes, strategy);
   }
 
   @Bean
@@ -44,9 +63,17 @@ public class InMemoryServicesConfiguration
       ChatBackendProperties properties,
       StorageStrategy storageStrategy)
   {
+    ShardingStrategyType sharding =
+        properties.getInmemory().getShardingStrategy();
+    int numShards = sharding == ShardingStrategyType.none
+        ? 1
+        : properties.getInmemory().getNumShards();
+    int[] ownedShards = sharding == ShardingStrategyType.none
+        ? new int[] { 0 }
+        : properties.getInmemory().getOwnedShards();
     return new InMemoryChatHomeService(
-        properties.getInmemory().getNumShards(),
-        properties.getInmemory().getOwnedShards(),
+        numShards,
+        ownedShards,
         storageStrategy.read());
   }
 
index c7e5643..df730aa 100644 (file)
@@ -2,7 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
index 24c6a01..1e3e5ee 100644 (file)
@@ -5,7 +5,7 @@ import com.fasterxml.jackson.databind.JavaType;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.chat.backend.api.ChatRoomTo;
 import de.juplo.kafka.chat.backend.api.MessageTo;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
index 862dac0..2b33eed 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
index ec90e94..d21fe2b 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.storage.mongodb;
 
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
 import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory;
index 9027c6f..151a833 100644 (file)
@@ -7,8 +7,6 @@ import org.springframework.boot.test.context.SpringBootTest;
                webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
                properties = {
                                "chat.backend.inmemory.sharding-strategy=none",
-                               "chat.backend.inmemory.num-shards=1",
-                               "chat.backend.inmemory.owned-shards=0",
                                "chat.backend.inmemory.storage-directory=target/test-classes/data/files" })
 class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT
 {
index b1c80a9..1b25a11 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.chat.backend.api;
 
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;
 import lombok.extern.slf4j.Slf4j;
@@ -9,16 +8,13 @@ import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
 import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.test.mock.mockito.MockBean;
-import org.springframework.context.annotation.Bean;
 import org.springframework.http.MediaType;
 import org.springframework.test.web.reactive.server.WebTestClient;
 import reactor.core.publisher.Mono;
 
 import java.time.Clock;
 import java.time.LocalDateTime;
-import java.util.Arrays;
 import java.util.UUID;
 
 import static org.mockito.ArgumentMatchers.any;
@@ -27,7 +23,7 @@ import static org.mockito.Mockito.*;
 
 @SpringBootTest(properties = {
     "spring.main.allow-bean-definition-overriding=true",
-    "chat.backend.inmemory.owned-shards=0,1,2,3,4,5,6,7,8,9" })
+    "chat.backend.inmemory.sharding-strategy=none" })
 @AutoConfigureWebTestClient
 @Slf4j
 public class ChatBackendControllerTest
@@ -255,20 +251,4 @@ public class ChatBackendControllerTest
         .jsonPath("$.username").isEqualTo(user);
     verify(chatRoomService, never()).persistMessage(eq(key), any(LocalDateTime.class), any(String.class));
   }
-
-  @TestConfiguration
-  static class Config
-  {
-    @Bean
-    ChatHome[] chatHomes(
-        ChatBackendProperties properties,
-        InMemoryChatHomeService service)
-    {
-      SimpleChatHome[] chatHomes = new SimpleChatHome[properties.getInmemory().getNumShards()];
-      Arrays
-          .stream(properties.getInmemory().getOwnedShards())
-          .forEach(i -> chatHomes[i] = new SimpleChatHome(service, i));
-      return chatHomes;
-    }
-  }
 }
index a648dae..5b53607 100644 (file)
@@ -30,7 +30,7 @@ public class SimpleChatHomeTest
         mock(ChatRoomService.class),
         8);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom));
-    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService, 0);
+    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
 
     // When
     Mono<ChatRoom> mono = chatHome.getChatRoom(chatRoom.getId());
@@ -46,7 +46,7 @@ public class SimpleChatHomeTest
     // Given
     ChatHomeService chatHomeService = mock(ChatHomeService.class);
     when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty());
-    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService, 0);
+    SimpleChatHome chatHome = new SimpleChatHome(chatHomeService);
 
     // When
     Mono<ChatRoom> mono = chatHome.getChatRoom(UUID.randomUUID());
index 58cfaf0..c934ff4 100644 (file)
@@ -24,7 +24,7 @@ public abstract class AbstractStorageStrategyIT
 
   protected void start()
   {
-    chathome = new SimpleChatHome(getChatHomeServiceSupplier().get(), 0);
+    chathome = new SimpleChatHome(getChatHomeServiceSupplier().get());
     chatRoomFactory = getChatRoomFactory();
   }
 
index 8972b04..fe7ecac 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.persistence;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory;
index 34e2126..9808aa3 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence;
 
-import de.juplo.kafka.chat.backend.api.ShardingStrategy;
+import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
 import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService;