refactor: `ChatHome` is a service - Moved classes
authorKai Moritz <kai@juplo.de>
Wed, 6 Sep 2023 21:41:27 +0000 (23:41 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 14:16:22 +0000 (15:16 +0100)
18 files changed:
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
deleted file mode 100644 (file)
index 2ff59cb..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.UUID;
-
-
-public interface ChatHome
-{
-  Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
-
-  Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
-
-  Flux<ChatRoomInfo> getChatRoomInfo();
-
-  Mono<ChatRoomData> getChatRoomData(UUID id);
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java
new file mode 100644 (file)
index 0000000..2ff59cb
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+public interface ChatHome
+{
+  Mono<ChatRoomInfo> createChatRoom(UUID id, String name);
+
+  Mono<ChatRoomInfo> getChatRoomInfo(UUID id);
+
+  Flux<ChatRoomInfo> getChatRoomInfo();
+
+  Mono<ChatRoomData> getChatRoomData(UUID id);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java
deleted file mode 100644 (file)
index 4c8b2fa..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHome implements ChatHome
-{
-  private final SimpleChatHome[] chatHomes;
-  private final Set<Integer> ownedShards;
-  private final ShardingStrategy shardingStrategy;
-
-
-  public  ShardedChatHome(
-      SimpleChatHome[] chatHomes,
-      ShardingStrategy shardingStrategy)
-  {
-    this.chatHomes = chatHomes;
-    this.shardingStrategy = shardingStrategy;
-    this.ownedShards = new HashSet<>();
-    for (int shard = 0; shard < chatHomes.length; shard++)
-      if(chatHomes[shard] != null)
-        this.ownedShards.add(shard);
-    log.info(
-        "Created ShardedChatHome for shards: {}",
-        ownedShards
-            .stream()
-            .map(String::valueOf)
-            .collect(Collectors.joining(", ")));
-  }
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    int shard = shardingStrategy.selectShard(id);
-    return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
-        : chatHomes[shard].createChatRoom(id, name);
-  }
-
-  @Override
-  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
-        : chatHomes[shard]
-            .getChatRoomInfo(id)
-            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
-            ? new UnknownChatroomException(
-                id,
-                shard,
-                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
-            : throwable);
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux
-        .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
-  }
-
-  @Override
-  public Mono<ChatRoomData> getChatRoomData(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
-        : chatHomes[shard]
-            .getChatRoomData(id)
-            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
-                ? new UnknownChatroomException(
-                id,
-                shard,
-                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
-                : throwable);
-  }
-
-  public Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux
-        .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRoomData());
-  }
-
-
-
-  private int selectShard(UUID chatroomId)
-  {
-    return shardingStrategy.selectShard(chatroomId);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java
new file mode 100644 (file)
index 0000000..4c8b2fa
--- /dev/null
@@ -0,0 +1,106 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class ShardedChatHome implements ChatHome
+{
+  private final SimpleChatHome[] chatHomes;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
+
+
+  public  ShardedChatHome(
+      SimpleChatHome[] chatHomes,
+      ShardingStrategy shardingStrategy)
+  {
+    this.chatHomes = chatHomes;
+    this.shardingStrategy = shardingStrategy;
+    this.ownedShards = new HashSet<>();
+    for (int shard = 0; shard < chatHomes.length; shard++)
+      if(chatHomes[shard] != null)
+        this.ownedShards.add(shard);
+    log.info(
+        "Created ShardedChatHome for shards: {}",
+        ownedShards
+            .stream()
+            .map(String::valueOf)
+            .collect(Collectors.joining(", ")));
+  }
+
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    int shard = shardingStrategy.selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard].createChatRoom(id, name);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomInfo(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+            ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+            : throwable);
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomData(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+                ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+                : throwable);
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRoomData());
+  }
+
+
+
+  private int selectShard(UUID chatroomId)
+  {
+    return shardingStrategy.selectShard(chatroomId);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java
deleted file mode 100644 (file)
index 868c01e..0000000
+++ /dev/null
@@ -1,122 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHome implements ChatHome
-{
-  private final Integer shard;
-  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
-  private final Map<UUID, ChatRoomData> chatRoomData;
-  private final Clock clock;
-  private final int bufferSize;
-
-
-
-  public SimpleChatHome(
-      StorageStrategy storageStrategy,
-      Clock clock,
-      int bufferSize)
-  {
-    this(
-        null,
-        storageStrategy,
-        clock,
-        bufferSize);
-  }
-
-  public SimpleChatHome(
-      Integer shard,
-      StorageStrategy storageStrategy,
-      Clock clock,
-      int bufferSize)
-  {
-    log.info("Created SimpleChatHome for shard {}", shard);
-;
-    this.shard = shard;
-    this.chatRoomInfo = new HashMap<>();
-    this.chatRoomData = new HashMap<>();
-    storageStrategy
-        .readChatRoomInfo()
-        .filter(info ->
-        {
-          if (shard == null || info.getShard() == shard)
-          {
-            return true;
-          }
-          else
-          {
-            log.info(
-                "SimpleChatHome for shard {} ignores not owned chat-room {}",
-                shard,
-                info);
-            return false;
-          }
-        })
-        .toStream()
-        .forEach(info ->
-        {
-          UUID chatRoomId = info.getId();
-          chatRoomInfo.put(chatRoomId, info);
-          Flux<Message> messageFlux =
-              storageStrategy.readChatRoomData(chatRoomId);
-          chatRoomData.put(
-              info.getId(),
-              new ChatRoomData(
-                  clock,
-                  new InMemoryChatRoomService(messageFlux),
-                  bufferSize));
-        });
-    this.clock = clock;
-    this.bufferSize = bufferSize;
-  }
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
-    this.chatRoomInfo.put(id, chatRoomInfo);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    this.chatRoomData.put(id, chatRoomData);
-    return Mono.just(chatRoomInfo);
-  }
-
-  @Override
-  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
-  {
-    return Mono
-        .justOrEmpty(chatRoomInfo.get(id))
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux.fromIterable(chatRoomInfo.values());
-  }
-
-  @Override
-  public Mono<ChatRoomData> getChatRoomData(UUID id)
-  {
-    return Mono
-        .justOrEmpty(chatRoomData.get(id))
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
-  }
-
-  public Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux.fromIterable(chatRoomData.values());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java
new file mode 100644 (file)
index 0000000..868c01e
--- /dev/null
@@ -0,0 +1,122 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.*;
+
+
+@Slf4j
+public class SimpleChatHome implements ChatHome
+{
+  private final Integer shard;
+  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final Map<UUID, ChatRoomData> chatRoomData;
+  private final Clock clock;
+  private final int bufferSize;
+
+
+
+  public SimpleChatHome(
+      StorageStrategy storageStrategy,
+      Clock clock,
+      int bufferSize)
+  {
+    this(
+        null,
+        storageStrategy,
+        clock,
+        bufferSize);
+  }
+
+  public SimpleChatHome(
+      Integer shard,
+      StorageStrategy storageStrategy,
+      Clock clock,
+      int bufferSize)
+  {
+    log.info("Created SimpleChatHome for shard {}", shard);
+;
+    this.shard = shard;
+    this.chatRoomInfo = new HashMap<>();
+    this.chatRoomData = new HashMap<>();
+    storageStrategy
+        .readChatRoomInfo()
+        .filter(info ->
+        {
+          if (shard == null || info.getShard() == shard)
+          {
+            return true;
+          }
+          else
+          {
+            log.info(
+                "SimpleChatHome for shard {} ignores not owned chat-room {}",
+                shard,
+                info);
+            return false;
+          }
+        })
+        .toStream()
+        .forEach(info ->
+        {
+          UUID chatRoomId = info.getId();
+          chatRoomInfo.put(chatRoomId, info);
+          Flux<Message> messageFlux =
+              storageStrategy.readChatRoomData(chatRoomId);
+          chatRoomData.put(
+              info.getId(),
+              new ChatRoomData(
+                  clock,
+                  new InMemoryChatRoomService(messageFlux),
+                  bufferSize));
+        });
+    this.clock = clock;
+    this.bufferSize = bufferSize;
+  }
+
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+    this.chatRoomInfo.put(id, chatRoomInfo);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    this.chatRoomData.put(id, chatRoomData);
+    return Mono.just(chatRoomInfo);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+  {
+    return Mono
+        .justOrEmpty(chatRoomInfo.get(id))
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux.fromIterable(chatRoomInfo.values());
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    return Mono
+        .justOrEmpty(chatRoomData.get(id))
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux.fromIterable(chatRoomData.values());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
deleted file mode 100644 (file)
index 86b3270..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHome;
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.utils.Utils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatHome implements ChatHome
-{
-  private final int numPartitions;
-  private final ChatRoomChannel chatRoomChannel;
-
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
-  }
-
-  @Override
-  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomInfo(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return chatRoomChannel.getChatRoomInfo();
-  }
-
-  @Override
-  public Mono<ChatRoomData> getChatRoomData(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomData(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
-  }
-
-  public Flux<ChatRoomData> getChatRoomData()
-  {
-      return chatRoomChannel.getChatRoomData();
-  }
-
-  int selectShard(UUID chatRoomId)
-  {
-    byte[] serializedKey = chatRoomId.toString().getBytes();
-    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
new file mode 100644 (file)
index 0000000..86b3270
--- /dev/null
@@ -0,0 +1,72 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHome;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.utils.Utils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHome implements ChatHome
+{
+  private final int numPartitions;
+  private final ChatRoomChannel chatRoomChannel;
+
+
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Sending create-command for chat rooom: id={}, name={}");
+    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatRoomChannel
+        .getChatRoomInfo(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+            id,
+            shard,
+            chatRoomChannel.getOwnedShards())));
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return chatRoomChannel.getChatRoomInfo();
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatRoomChannel
+        .getChatRoomData(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+            id,
+            shard,
+            chatRoomChannel.getOwnedShards())));
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+      return chatRoomChannel.getChatRoomData();
+  }
+
+  int selectShard(UUID chatRoomId)
+  {
+    byte[] serializedKey = chatRoomId.toString().getBytes();
+    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..6282643
--- /dev/null
@@ -0,0 +1,68 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+@ExtendWith(SpringExtension.class)
+public abstract class ChatHomeTest
+{
+  @Autowired
+  ChatHome chatHome;
+
+
+  @Test
+  @DisplayName("Assert chatroom is delivered, if it exists")
+  void testGetExistingChatroom()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+
+    // When
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHome.getChatRoomData(chatRoomId))
+        .log("testGetExistingChatroom")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException));
+
+    // Then
+    assertThat(mono).emitsCount(1);
+  }
+
+  @Test
+  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
+  void testGetNonExistentChatroom()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
+
+    // When
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHome.getChatRoomData(chatRoomId))
+        .log("testGetNonExistentChatroom")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException));
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(UnknownChatroomException.class);
+      UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
+      assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+    });
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeServiceWithShardsTest.java
new file mode 100644 (file)
index 0000000..65a67b9
--- /dev/null
@@ -0,0 +1,46 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
+
+import java.time.Duration;
+import java.util.UUID;
+
+import static pl.rzrz.assertj.reactor.Assertions.assertThat;
+
+
+public abstract class ChatHomeWithShardsTest extends ChatHomeTest
+{
+  public static final int NUM_SHARDS = 10;
+  public static final int OWNED_SHARD = 2;
+  public static final int NOT_OWNED_SHARD = 0;
+
+
+  @Test
+  @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
+  void testGetChatroomForNotOwnedShard()
+  {
+    // Given
+    UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
+
+    // When
+    Mono<ChatRoomData> mono = Mono
+        .defer(() -> chatHome.getChatRoomData(chatRoomId))
+        .log("testGetChatroomForNotOwnedShard")
+        .retryWhen(Retry
+            .backoff(5, Duration.ofSeconds(1))
+            .filter(throwable -> throwable instanceof LoadInProgressException));
+
+    // Then
+    assertThat(mono).sendsError(e ->
+    {
+      assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+      ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+      assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
+    });
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java
deleted file mode 100644 (file)
index 6282643..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.ExtendWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.test.context.junit.jupiter.SpringExtension;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-@ExtendWith(SpringExtension.class)
-public abstract class ChatHomeTest
-{
-  @Autowired
-  ChatHome chatHome;
-
-
-  @Test
-  @DisplayName("Assert chatroom is delivered, if it exists")
-  void testGetExistingChatroom()
-  {
-    // Given
-    UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
-
-    // When
-    Mono<ChatRoomData> mono = Mono
-        .defer(() -> chatHome.getChatRoomData(chatRoomId))
-        .log("testGetExistingChatroom")
-        .retryWhen(Retry
-            .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
-
-    // Then
-    assertThat(mono).emitsCount(1);
-  }
-
-  @Test
-  @DisplayName("Assert UnknownChatroomException is thrown, if chatroom does not exist")
-  void testGetNonExistentChatroom()
-  {
-    // Given
-    UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17");
-
-    // When
-    Mono<ChatRoomData> mono = Mono
-        .defer(() -> chatHome.getChatRoomData(chatRoomId))
-        .log("testGetNonExistentChatroom")
-        .retryWhen(Retry
-            .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
-
-    // Then
-    assertThat(mono).sendsError(e ->
-    {
-      assertThat(e).isInstanceOf(UnknownChatroomException.class);
-      UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
-      assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
-    });
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java
deleted file mode 100644 (file)
index 65a67b9..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.juplo.kafka.chat.backend.domain;
-
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import reactor.core.publisher.Mono;
-import reactor.util.retry.Retry;
-
-import java.time.Duration;
-import java.util.UUID;
-
-import static pl.rzrz.assertj.reactor.Assertions.assertThat;
-
-
-public abstract class ChatHomeWithShardsTest extends ChatHomeTest
-{
-  public static final int NUM_SHARDS = 10;
-  public static final int OWNED_SHARD = 2;
-  public static final int NOT_OWNED_SHARD = 0;
-
-
-  @Test
-  @DisplayName("Assert ShardNotOwnedException is thrown, if the shard for the chatroom is not owned")
-  void testGetChatroomForNotOwnedShard()
-  {
-    // Given
-    UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19");
-
-    // When
-    Mono<ChatRoomData> mono = Mono
-        .defer(() -> chatHome.getChatRoomData(chatRoomId))
-        .log("testGetChatroomForNotOwnedShard")
-        .retryWhen(Retry
-            .backoff(5, Duration.ofSeconds(1))
-            .filter(throwable -> throwable instanceof LoadInProgressException));
-
-    // Then
-    assertThat(mono).sendsError(e ->
-    {
-      assertThat(e).isInstanceOf(ShardNotOwnedException.class);
-      ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
-      assertThat(shardNotOwnedException.getShard()).isEqualTo(NOT_OWNED_SHARD);
-    });
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..8d9036f
--- /dev/null
@@ -0,0 +1,65 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+public class ShardedChatHomeTest extends ChatHomeWithShardsTest
+{
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    ShardedChatHome chatHome(
+        StorageStrategy storageStrategy,
+        Clock clock)
+    {
+      SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
+
+      IntStream
+          .of(ownedShards())
+          .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
+              shard,
+              storageStrategy,
+              clock,
+              bufferSize()));
+
+      ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
+
+      return new ShardedChatHome(chatHomes, strategy);
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy(Clock clock)
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          new KafkaLikeShardingStrategy(NUM_SHARDS),
+          new ObjectMapper());
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+
+    int[] ownedShards()
+    {
+      return new int[] { OWNED_SHARD };
+    }
+
+    int bufferSize()
+    {
+      return 8;
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java
deleted file mode 100644 (file)
index 8d9036f..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.stream.IntStream;
-
-public class ShardedChatHomeTest extends ChatHomeWithShardsTest
-{
-  @TestConfiguration
-  static class Configuration
-  {
-    @Bean
-    ShardedChatHome chatHome(
-        StorageStrategy storageStrategy,
-        Clock clock)
-    {
-      SimpleChatHome[] chatHomes = new SimpleChatHome[NUM_SHARDS];
-
-      IntStream
-          .of(ownedShards())
-          .forEach(shard -> chatHomes[shard] = new SimpleChatHome(
-              shard,
-              storageStrategy,
-              clock,
-              bufferSize()));
-
-      ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
-
-      return new ShardedChatHome(chatHomes, strategy);
-    }
-
-    @Bean
-    public FilesStorageStrategy storageStrategy(Clock clock)
-    {
-      return new FilesStorageStrategy(
-          Paths.get("target", "test-classes", "data", "files"),
-          new KafkaLikeShardingStrategy(NUM_SHARDS),
-          new ObjectMapper());
-    }
-
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-
-    int[] ownedShards()
-    {
-      return new int[] { OWNED_SHARD };
-    }
-
-    int bufferSize()
-    {
-      return 8;
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..8be3173
--- /dev/null
@@ -0,0 +1,50 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+
+
+public class SimpleChatHomeTest extends ChatHomeTest
+{
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    SimpleChatHome chatHome(
+        StorageStrategy storageStrategy,
+        Clock clock)
+    {
+      return new SimpleChatHome(
+          storageStrategy,
+          clock,
+          bufferSize());
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy(Clock clock)
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          chatRoomId -> 0,
+          new ObjectMapper());
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+
+    int bufferSize()
+    {
+      return 8;
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java
deleted file mode 100644 (file)
index 8be3173..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeTest;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-
-
-public class SimpleChatHomeTest extends ChatHomeTest
-{
-  @TestConfiguration
-  static class Configuration
-  {
-    @Bean
-    SimpleChatHome chatHome(
-        StorageStrategy storageStrategy,
-        Clock clock)
-    {
-      return new SimpleChatHome(
-          storageStrategy,
-          clock,
-          bufferSize());
-    }
-
-    @Bean
-    public FilesStorageStrategy storageStrategy(Clock clock)
-    {
-      return new FilesStorageStrategy(
-          Paths.get("target", "test-classes", "data", "files"),
-          chatRoomId -> 0,
-          new ObjectMapper());
-    }
-
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-
-    int bufferSize()
-    {
-      return 8;
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..d758a22
--- /dev/null
@@ -0,0 +1,113 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
+
+
+@SpringBootTest(
+    classes = {
+        KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class,
+        KafkaServicesConfiguration.class,
+        KafkaAutoConfiguration.class,
+        TaskExecutionAutoConfiguration.class,
+    },
+    properties = {
+    "chat.backend.services=kafka",
+    "chat.backend.kafka.client-id-PREFIX=TEST",
+    "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+    "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+    "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
+public class KafkaChatHomeTest extends ChatHomeWithShardsTest
+{
+  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+
+  static CompletableFuture<Void> CONSUMER_JOB;
+
+
+  @TestConfiguration
+  @EnableConfigurationProperties(ChatBackendProperties.class)
+  static class KafkaChatHomeTestConfiguration
+  {
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+
+  @BeforeAll
+  public static void sendAndLoadStoredData(
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired Consumer chatRoomChannelConsumer,
+      @Autowired ThreadPoolTaskExecutor taskExecutor,
+      @Autowired ChatRoomChannel chatRoomChannel)
+  {
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+    chatRoomChannelConsumer.assign(assignedPartitions);
+    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    CONSUMER_JOB = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    SendResult<String, String> result = kafkaTemplate.send(record).join();
+    log.info(
+        "Sent {}={} to {}",
+        key,
+        value,
+        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+  }
+
+  @AfterAll
+  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+  {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    CONSUMER_JOB.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java
deleted file mode 100644 (file)
index d758a22..0000000
+++ /dev/null
@@ -1,113 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.time.Clock;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
-
-
-@SpringBootTest(
-    classes = {
-        KafkaChatHomeTest.KafkaChatHomeTestConfiguration.class,
-        KafkaServicesConfiguration.class,
-        KafkaAutoConfiguration.class,
-        TaskExecutionAutoConfiguration.class,
-    },
-    properties = {
-    "chat.backend.services=kafka",
-    "chat.backend.kafka.client-id-PREFIX=TEST",
-    "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-    "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
-    "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
-})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
-@Slf4j
-public class KafkaChatHomeTest extends ChatHomeWithShardsTest
-{
-  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
-
-  static CompletableFuture<Void> CONSUMER_JOB;
-
-
-  @TestConfiguration
-  @EnableConfigurationProperties(ChatBackendProperties.class)
-  static class KafkaChatHomeTestConfiguration
-  {
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-  }
-
-
-  @BeforeAll
-  public static void sendAndLoadStoredData(
-      @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired Consumer chatRoomChannelConsumer,
-      @Autowired ThreadPoolTaskExecutor taskExecutor,
-      @Autowired ChatRoomChannel chatRoomChannel)
-  {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
-    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
-    chatRoomChannelConsumer.assign(assignedPartitions);
-    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
-    CONSUMER_JOB = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
-  }
-
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
-  {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
-    record.headers().add("__TypeId__", typeId.getBytes());
-    SendResult<String, String> result = kafkaTemplate.send(record).join();
-    log.info(
-        "Sent {}={} to {}",
-        key,
-        value,
-        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
-  }
-
-  @AfterAll
-  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
-  {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    CONSUMER_JOB.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-}