refactor: Renamed `persistence` into `implementation` - Moved classes
authorKai Moritz <kai@juplo.de>
Wed, 6 Sep 2023 21:48:39 +0000 (23:48 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:35:33 +0000 (10:35 +0100)
40 files changed:
src/main/java/de/juplo/kafka/chat/backend/implementation/ShardingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/KafkaLikeShardingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/ShardingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/ShardingStrategy.java
new file mode 100644 (file)
index 0000000..f7350f7
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import java.util.UUID;
+
+
+public interface ShardingStrategy
+{
+  int selectShard(UUID chatRoomId);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/StorageStrategy.java
new file mode 100644 (file)
index 0000000..2913efe
--- /dev/null
@@ -0,0 +1,29 @@
+package de.juplo.kafka.chat.backend.persistence;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Flux;
+
+import java.util.UUID;
+
+
+public interface StorageStrategy
+{
+  default void write(ChatHomeService chatHomeService)
+  {
+    writeChatRoomInfo(
+        chatHomeService
+            .getChatRoomInfo()
+            .doOnNext(chatRoomInfo -> writeChatRoomData(
+                chatRoomInfo.getId(),
+                chatHomeService
+                    .getChatRoomData(chatRoomInfo.getId())
+                    .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
+  }
+
+  void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
+  Flux<ChatRoomInfo> readChatRoomInfo();
+  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
+  Flux<Message> readChatRoomData(UUID chatRoomId);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryChatRoomService.java
new file mode 100644 (file)
index 0000000..e1d5a5e
--- /dev/null
@@ -0,0 +1,55 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+public class InMemoryChatRoomService implements ChatRoomService
+{
+  private final LinkedHashMap<Message.MessageKey, Message> messages;
+
+
+  public InMemoryChatRoomService(Flux<Message> messageFlux)
+  {
+    log.debug("Creating InMemoryChatRoomService");
+    messages = new LinkedHashMap<>();
+    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
+  }
+
+  @Override
+  public Mono<Message> persistMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    Message message = new Message(key, (long)messages.size(), timestamp, text);
+    messages.put(message.getKey(), message);
+    return Mono.just(message);
+  }
+
+  @Override
+  public Mono<Message> getMessage(Message.MessageKey key)
+  {
+    return Mono.fromSupplier(() -> messages.get(key));
+  }
+
+  @Override
+  public Flux<Message> getMessages(long first, long last)
+  {
+    return Flux.fromStream(messages
+        .values()
+        .stream()
+        .filter(message ->
+        {
+          long serial = message.getSerialNumber();
+          return serial >= first && serial <= last;
+        }));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
new file mode 100644 (file)
index 0000000..263a2d5
--- /dev/null
@@ -0,0 +1,84 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend",
+    name = "services",
+    havingValue = "inmemory",
+    matchIfMissing = true)
+@Configuration
+public class InMemoryServicesConfiguration
+{
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  ChatHomeService noneShardingChatHome(
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy,
+      Clock clock)
+  {
+    return new SimpleChatHomeService(
+        storageStrategy,
+        clock,
+        properties.getChatroomBufferSize());
+  }
+
+  @Bean
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  ChatHomeService kafkalikeShardingChatHome(
+      ChatBackendProperties properties,
+      StorageStrategy storageStrategy,
+      Clock clock)
+  {
+    int numShards = properties.getInmemory().getNumShards();
+    SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
+    IntStream
+        .of(properties.getInmemory().getOwnedShards())
+        .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
+            shard,
+            storageStrategy,
+            clock,
+            properties.getChatroomBufferSize()));
+    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+    return new ShardedChatHomeService(chatHomes, strategy);
+  }
+
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "none",
+      matchIfMissing = true)
+  @Bean
+  ShardingStrategy defaultShardingStrategy()
+  {
+    return chatRoomId -> 0;
+  }
+
+  @ConditionalOnProperty(
+      prefix = "chat.backend.inmemory",
+      name = "sharding-strategy",
+      havingValue = "kafkalike")
+  @Bean
+  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
+  {
+    return new KafkaLikeShardingStrategy(
+        properties.getInmemory().getNumShards());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/KafkaLikeShardingStrategy.java
new file mode 100644 (file)
index 0000000..e20dab7
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+public class KafkaLikeShardingStrategy implements ShardingStrategy
+{
+  private final int numPartitions;
+
+  @Override
+  public int selectShard(UUID chatRoomId)
+  {
+    byte[] serializedKey = chatRoomId.toString().getBytes();
+    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
new file mode 100644 (file)
index 0000000..09e4684
--- /dev/null
@@ -0,0 +1,106 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class ShardedChatHomeService implements ChatHomeService
+{
+  private final SimpleChatHomeService[] chatHomes;
+  private final Set<Integer> ownedShards;
+  private final ShardingStrategy shardingStrategy;
+
+
+  public ShardedChatHomeService(
+      SimpleChatHomeService[] chatHomes,
+      ShardingStrategy shardingStrategy)
+  {
+    this.chatHomes = chatHomes;
+    this.shardingStrategy = shardingStrategy;
+    this.ownedShards = new HashSet<>();
+    for (int shard = 0; shard < chatHomes.length; shard++)
+      if(chatHomes[shard] != null)
+        this.ownedShards.add(shard);
+    log.info(
+        "Created ShardedChatHome for shards: {}",
+        ownedShards
+            .stream()
+            .map(String::valueOf)
+            .collect(Collectors.joining(", ")));
+  }
+
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    int shard = shardingStrategy.selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard].createChatRoom(id, name);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomInfo(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+            ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+            : throwable);
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatHomes[shard] == null
+        ? Mono.error(new ShardNotOwnedException(shard))
+        : chatHomes[shard]
+            .getChatRoomData(id)
+            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
+                ? new UnknownChatroomException(
+                id,
+                shard,
+                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
+                : throwable);
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux
+        .fromIterable(ownedShards)
+        .flatMap(shard -> chatHomes[shard].getChatRoomData());
+  }
+
+
+
+  private int selectShard(UUID chatroomId)
+  {
+    return shardingStrategy.selectShard(chatroomId);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
new file mode 100644 (file)
index 0000000..d9c3154
--- /dev/null
@@ -0,0 +1,122 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Clock;
+import java.util.*;
+
+
+@Slf4j
+public class SimpleChatHomeService implements ChatHomeService
+{
+  private final Integer shard;
+  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final Map<UUID, ChatRoomData> chatRoomData;
+  private final Clock clock;
+  private final int bufferSize;
+
+
+
+  public SimpleChatHomeService(
+      StorageStrategy storageStrategy,
+      Clock clock,
+      int bufferSize)
+  {
+    this(
+        null,
+        storageStrategy,
+        clock,
+        bufferSize);
+  }
+
+  public SimpleChatHomeService(
+      Integer shard,
+      StorageStrategy storageStrategy,
+      Clock clock,
+      int bufferSize)
+  {
+    log.info("Created SimpleChatHome for shard {}", shard);
+;
+    this.shard = shard;
+    this.chatRoomInfo = new HashMap<>();
+    this.chatRoomData = new HashMap<>();
+    storageStrategy
+        .readChatRoomInfo()
+        .filter(info ->
+        {
+          if (shard == null || info.getShard() == shard)
+          {
+            return true;
+          }
+          else
+          {
+            log.info(
+                "SimpleChatHome for shard {} ignores not owned chat-room {}",
+                shard,
+                info);
+            return false;
+          }
+        })
+        .toStream()
+        .forEach(info ->
+        {
+          UUID chatRoomId = info.getId();
+          chatRoomInfo.put(chatRoomId, info);
+          Flux<Message> messageFlux =
+              storageStrategy.readChatRoomData(chatRoomId);
+          chatRoomData.put(
+              info.getId(),
+              new ChatRoomData(
+                  clock,
+                  new InMemoryChatRoomService(messageFlux),
+                  bufferSize));
+        });
+    this.clock = clock;
+    this.bufferSize = bufferSize;
+  }
+
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
+    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
+    ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
+    this.chatRoomInfo.put(id, chatRoomInfo);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    this.chatRoomData.put(id, chatRoomData);
+    return Mono.just(chatRoomInfo);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+  {
+    return Mono
+        .justOrEmpty(chatRoomInfo.get(id))
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux.fromIterable(chatRoomInfo.values());
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    return Mono
+        .justOrEmpty(chatRoomData.get(id))
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux.fromIterable(chatRoomData.values());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChatRoomChannel.java
new file mode 100644 (file)
index 0000000..7e95c64
--- /dev/null
@@ -0,0 +1,436 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.*;
+import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
+import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.*;
+import java.util.*;
+import java.util.stream.IntStream;
+
+
+@Slf4j
+public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+{
+  private final String topic;
+  private final Producer<String, AbstractMessageTo> producer;
+  private final Consumer<String, AbstractMessageTo> consumer;
+  private final ZoneId zoneId;
+  private final int numShards;
+  private final int bufferSize;
+  private final Clock clock;
+  private final boolean[] isShardOwned;
+  private final long[] currentOffset;
+  private final long[] nextOffset;
+  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
+  private final Map<UUID, ChatRoomData>[] chatRoomData;
+
+  private boolean running;
+  @Getter
+  private volatile boolean loadInProgress;
+
+
+  public ChatRoomChannel(
+    String topic,
+    Producer<String, AbstractMessageTo> producer,
+    Consumer<String, AbstractMessageTo> consumer,
+    ZoneId zoneId,
+    int numShards,
+    int bufferSize,
+    Clock clock)
+  {
+    log.debug(
+        "Creating ChatRoomChannel for topic {} with {} partitions",
+        topic,
+        numShards);
+    this.topic = topic;
+    this.consumer = consumer;
+    this.producer = producer;
+    this.zoneId = zoneId;
+    this.numShards = numShards;
+    this.bufferSize = bufferSize;
+    this.clock = clock;
+    this.isShardOwned = new boolean[numShards];
+    this.currentOffset = new long[numShards];
+    this.nextOffset = new long[numShards];
+    this.chatRoomInfo = new Map[numShards];
+    this.chatRoomData = new Map[numShards];
+    IntStream
+        .range(0, numShards)
+        .forEach(shard ->
+        {
+          this.chatRoomInfo[shard] = new HashMap<>();
+          this.chatRoomData[shard] = new HashMap<>();
+        });
+  }
+
+
+
+  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+      UUID chatRoomId,
+      String name)
+  {
+    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              chatRoomId.toString(),
+              createChatRoomRequestTo);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
+          createChatRoom(chatRoomInfo);
+          sink.success(chatRoomInfo);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send create-request for chat room (id={}, name={}): {}",
+              chatRoomId,
+              name,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  Mono<Message> sendChatMessage(
+      UUID chatRoomId,
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              null,
+              zdt.toEpochSecond(),
+              chatRoomId.toString(),
+              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          Message message = new Message(key, metadata.offset(), timestamp, text);
+          log.info("Successfully send message {}", message);
+          sink.success(message);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
+              chatRoomId,
+              key,
+              timestamp,
+              text,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    log.info("Newly assigned partitions! Pausing normal operations...");
+    loadInProgress = true;
+
+    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+    {
+      int partition = topicPartition.partition();
+      isShardOwned[partition] =  true;
+      this.currentOffset[partition] = currentOffset;
+
+      log.info(
+          "Partition assigned: {} - loading messages: next={} -> current={}",
+          partition,
+          nextOffset[partition],
+          currentOffset);
+
+      consumer.seek(topicPartition, nextOffset[partition]);
+    });
+
+    consumer.resume(partitions);
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(topicPartition ->
+    {
+      int partition = topicPartition.partition();
+      isShardOwned[partition] = false;
+      log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+    });
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    log.warn("Lost partitions: {}, partitions");
+    // TODO: Muss auf den Verlust anders reagiert werden?
+    onPartitionsRevoked(partitions);
+  }
+
+  @Override
+  public void run()
+  {
+    running = true;
+
+    while (running)
+    {
+      try
+      {
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        log.info("Fetched {} messages", records.count());
+
+        if (loadInProgress)
+        {
+          loadChatRoom(records);
+
+          if (isLoadingCompleted())
+          {
+            log.info("Loading of messages completed! Pausing all owned partitions...");
+            pauseAllOwnedPartions();
+            log.info("Resuming normal operations...");
+            loadInProgress = false;
+          }
+        }
+        else
+        {
+          if (!records.isEmpty())
+          {
+            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
+          }
+        }
+      }
+      catch (WakeupException e)
+      {
+        log.info("Received WakeupException, exiting!");
+        running = false;
+      }
+    }
+
+    log.info("Exiting normally");
+  }
+
+  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  {
+    for (ConsumerRecord<String, AbstractMessageTo> record : records)
+    {
+      UUID chatRoomId = UUID.fromString(record.key());
+
+      switch (record.value().getType())
+      {
+        case COMMAND_CREATE_CHATROOM:
+          createChatRoom(
+              chatRoomId,
+              (CommandCreateChatRoomTo) record.value(),
+              record.partition());
+          break;
+
+        case EVENT_CHATMESSAGE_RECEIVED:
+          Instant instant = Instant.ofEpochSecond(record.timestamp());
+          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+          loadChatMessage(
+              chatRoomId,
+              timestamp,
+              record.offset(),
+              (EventChatMessageReceivedTo) record.value(),
+              record.partition());
+          break;
+
+        default:
+          log.debug(
+              "Ignoring message for chat-room {} with offset {}: {}",
+              chatRoomId,
+              record.offset(),
+              record.value());
+      }
+
+      nextOffset[record.partition()] = record.offset() + 1;
+    }
+  }
+
+  private void createChatRoom(
+      UUID chatRoomId,
+      CommandCreateChatRoomTo createChatRoomRequestTo,
+      Integer partition)
+  {
+    log.info(
+        "Loading ChatRoom {} for shard {} with buffer-size {}",
+        chatRoomId,
+        partition,
+        bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
+    ChatRoomData chatRoomData = new ChatRoomData(
+        clock,
+        service,
+        bufferSize);
+    putChatRoom(
+        chatRoomId,
+        createChatRoomRequestTo.getName(),
+        partition,
+        chatRoomData);
+  }
+
+
+  private void createChatRoom(ChatRoomInfo chatRoomInfo)
+  {
+    UUID id = chatRoomInfo.getId();
+    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
+    putChatRoom(
+        chatRoomInfo.getId(),
+        chatRoomInfo.getName(),
+        chatRoomInfo.getShard(),
+        chatRoomData);
+  }
+
+  private void loadChatMessage(
+      UUID chatRoomId,
+      LocalDateTime timestamp,
+      long offset,
+      EventChatMessageReceivedTo chatMessageTo,
+      int partition)
+  {
+    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+
+    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
+    KafkaChatRoomService kafkaChatRoomService =
+        (KafkaChatRoomService) chatRoomData.getChatRoomService();
+
+    kafkaChatRoomService.persistMessage(message);
+  }
+
+  private boolean isLoadingCompleted()
+  {
+    return IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
+  }
+
+  private void pauseAllOwnedPartions()
+  {
+    consumer.pause(IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .mapToObj(shard -> new TopicPartition(topic, shard))
+        .toList());
+  }
+
+
+  private void putChatRoom(
+      UUID chatRoomId,
+      String name,
+      Integer partition,
+      ChatRoomData chatRoomData)
+  {
+    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+    {
+      log.warn(
+          "Ignoring existing chat-room for {}: {}",
+          partition,
+          chatRoomId);
+    }
+    else
+    {
+      log.info(
+          "Adding new chat-room to partition {}: {}",
+          partition,
+          chatRoomData);
+
+      this.chatRoomInfo[partition].put(
+          chatRoomId,
+          new ChatRoomInfo(chatRoomId, name, partition));
+      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
+    }
+  }
+
+  int[] getOwnedShards()
+  {
+    return IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .toArray();
+  }
+
+  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
+  {
+    if (loadInProgress)
+    {
+      return Mono.error(new LoadInProgressException());
+    }
+
+    if (!isShardOwned[shard])
+    {
+      return Mono.error(new ShardNotOwnedException(shard));
+    }
+
+    return Mono.justOrEmpty(chatRoomData[shard].get(id));
+  }
+
+  Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return Flux
+        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+        .filter(shard -> isShardOwned[shard])
+        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+  }
+
+  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+  {
+    if (loadInProgress)
+    {
+      return Mono.error(new LoadInProgressException());
+    }
+
+    if (!isShardOwned[shard])
+    {
+      return Mono.error(new ShardNotOwnedException(shard));
+    }
+
+    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
+  }
+
+  Flux<ChatRoomData> getChatRoomData()
+  {
+    return Flux
+        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+        .filter(shard -> isShardOwned[shard])
+        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
new file mode 100644 (file)
index 0000000..d3321d7
--- /dev/null
@@ -0,0 +1,72 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoomData;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.utils.Utils;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatHomeService implements ChatHomeService
+{
+  private final int numPartitions;
+  private final ChatRoomChannel chatRoomChannel;
+
+
+
+  @Override
+  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
+  {
+    log.info("Sending create-command for chat rooom: id={}, name={}");
+    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+  }
+
+  @Override
+  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatRoomChannel
+        .getChatRoomInfo(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+            id,
+            shard,
+            chatRoomChannel.getOwnedShards())));
+  }
+
+  @Override
+  public Flux<ChatRoomInfo> getChatRoomInfo()
+  {
+    return chatRoomChannel.getChatRoomInfo();
+  }
+
+  @Override
+  public Mono<ChatRoomData> getChatRoomData(UUID id)
+  {
+    int shard = selectShard(id);
+    return chatRoomChannel
+        .getChatRoomData(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
+            id,
+            shard,
+            chatRoomChannel.getOwnedShards())));
+  }
+
+  public Flux<ChatRoomData> getChatRoomData()
+  {
+      return chatRoomChannel.getChatRoomData();
+  }
+
+  int selectShard(UUID chatRoomId)
+  {
+    byte[] serializedKey = chatRoomId.toString().getBytes();
+    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatRoomService.java
new file mode 100644 (file)
index 0000000..7b2cc0b
--- /dev/null
@@ -0,0 +1,58 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class KafkaChatRoomService implements ChatRoomService
+{
+  private final ChatRoomChannel chatRoomChannel;
+  private final UUID chatRoomId;
+
+  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+
+  @Override
+  public Mono<Message> persistMessage(
+    Message.MessageKey key,
+    LocalDateTime timestamp,
+    String text)
+  {
+    return chatRoomChannel
+        .sendChatMessage(chatRoomId, key, timestamp, text)
+        .doOnSuccess(message -> persistMessage(message));
+  }
+
+  void persistMessage(Message message)
+  {
+    messages.put  (message.getKey(), message);
+  }
+
+  @Override
+  synchronized public Mono<Message> getMessage(Message.MessageKey key)
+  {
+    return Mono.fromSupplier(() -> messages.get(key));
+  }
+
+  @Override
+  synchronized public Flux<Message> getMessages(long first, long last)
+  {
+    return Flux.fromStream(messages
+      .values()
+      .stream()
+      .filter(message ->
+      {
+        long serial = message.getSerialNumber();
+        return serial >= first && serial <= last;
+      }));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
new file mode 100644 (file)
index 0000000..8474239
--- /dev/null
@@ -0,0 +1,68 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend",
+    name = "services",
+    havingValue = "kafka")
+@Component
+@Slf4j
+public class KafkaServicesApplicationRunner implements ApplicationRunner
+{
+  @Autowired
+  ChatBackendProperties properties;
+
+  @Autowired
+  ThreadPoolTaskExecutor taskExecutor;
+  @Autowired
+  ConfigurableApplicationContext context;
+
+  @Autowired
+  ChatRoomChannel chatRoomChannel;
+  @Autowired
+  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+
+  CompletableFuture<Void> chatRoomChannelConsumerJob;
+
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception
+  {
+    List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
+    chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+    log.info("Starting the consumer for the ChatRoomChannel");
+    chatRoomChannelConsumerJob = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinChatRoomChannelConsumerJob()
+  {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    chatRoomChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
new file mode 100644 (file)
index 0000000..c4479ce
--- /dev/null
@@ -0,0 +1,177 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.time.Clock;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+
+@ConditionalOnProperty(
+    prefix = "chat.backend",
+    name = "services",
+    havingValue = "kafka")
+@Configuration
+public class KafkaServicesConfiguration
+{
+  @Bean
+  ChatHomeService kafkaChatHome(
+      ChatBackendProperties properties,
+      ChatRoomChannel chatRoomChannel)
+  {
+    return new KafkaChatHomeService(
+        properties.getKafka().getNumPartitions(),
+        chatRoomChannel);
+  }
+
+  @Bean
+  ChatRoomChannel chatRoomChannel(
+      ChatBackendProperties properties,
+      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      ZoneId zoneId,
+      Clock clock)
+  {
+    return new ChatRoomChannel(
+        properties.getKafka().getChatRoomChannelTopic(),
+        chatRoomChannelProducer,
+        chatRoomChannelConsumer,
+        zoneId,
+        properties.getKafka().getNumPartitions(),
+        properties.getChatroomBufferSize(),
+        clock);
+  }
+
+  @Bean
+  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+      Properties defaultProducerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringSerializer stringSerializer,
+      JsonSerializer<AbstractMessageTo> messageSerializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ProducerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+    return new KafkaProducer<>(
+        properties,
+        stringSerializer,
+        messageSerializer);
+  }
+
+  @Bean
+  StringSerializer stringSerializer()
+  {
+    return new StringSerializer();
+  }
+
+  @Bean
+  JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
+  {
+    JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
+    serializer.configure(
+        Map.of(
+            JsonSerializer.TYPE_MAPPINGS, typeMappings),
+        false);
+    return serializer;
+  }
+
+  @Bean
+  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<AbstractMessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "chatroom_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  StringDeserializer stringDeserializer()
+  {
+    return new StringDeserializer();
+  }
+
+  @Bean
+  JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
+  {
+    JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
+    deserializer.configure(
+        Map.of(
+            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
+            JsonDeserializer.TYPE_MAPPINGS, typeMappings),
+        false );
+    return deserializer;
+  }
+
+  @Bean
+  String typeMappings ()
+  {
+    return
+        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
+  }
+
+  @Bean
+  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    return properties;
+  }
+
+  @Bean
+  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
+  {
+    Properties properties = new Properties();
+    properties.setProperty(
+        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+        chatBackendProperties.getKafka().getBootstrapServers());
+    properties.setProperty(
+        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
+        "false");
+    properties.setProperty(
+        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+        "earliest");
+    return properties;
+  }
+
+  @Bean
+  ZoneId zoneId()
+  {
+    return ZoneId.systemDefault();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java
new file mode 100644 (file)
index 0000000..7cc7541
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractMessageTo
+{
+  public enum ToType {
+    COMMAND_CREATE_CHATROOM,
+    EVENT_CHATMESSAGE_RECEIVED,
+  }
+
+  @Getter
+  private final ToType type;
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomTo.java
new file mode 100644 (file)
index 0000000..1a134f3
--- /dev/null
@@ -0,0 +1,27 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import lombok.*;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class CommandCreateChatRoomTo extends AbstractMessageTo
+{
+  private String name;
+
+
+  public CommandCreateChatRoomTo()
+  {
+    super(ToType.COMMAND_CREATE_CHATROOM);
+  }
+
+
+  public static CommandCreateChatRoomTo of(String name)
+  {
+    CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+    to.name = name;
+    return to;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedTo.java
new file mode 100644 (file)
index 0000000..2297b94
--- /dev/null
@@ -0,0 +1,48 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatMessageReceivedTo extends AbstractMessageTo
+{
+  private String user;
+  private Long id;
+  private String text;
+
+
+  public EventChatMessageReceivedTo()
+  {
+    super(ToType.EVENT_CHATMESSAGE_RECEIVED);
+  }
+
+
+  public Message toMessage(long offset, LocalDateTime timestamp)
+  {
+    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+  }
+
+  public static EventChatMessageReceivedTo from(Message message)
+  {
+    return EventChatMessageReceivedTo.of(
+        message.getUsername(),
+        message.getId(),
+        message.getMessageText());
+  }
+
+
+  public static EventChatMessageReceivedTo of(String user, Long id, String text)
+  {
+    EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+    to.user = user;
+    to.id = id;
+    to.text = text;
+    return to;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/ShardingStrategy.java
deleted file mode 100644 (file)
index f7350f7..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import java.util.UUID;
-
-
-public interface ShardingStrategy
-{
-  int selectShard(UUID chatRoomId);
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java
deleted file mode 100644 (file)
index 2913efe..0000000
+++ /dev/null
@@ -1,29 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Flux;
-
-import java.util.UUID;
-
-
-public interface StorageStrategy
-{
-  default void write(ChatHomeService chatHomeService)
-  {
-    writeChatRoomInfo(
-        chatHomeService
-            .getChatRoomInfo()
-            .doOnNext(chatRoomInfo -> writeChatRoomData(
-                chatRoomInfo.getId(),
-                chatHomeService
-                    .getChatRoomData(chatRoomInfo.getId())
-                    .flatMapMany(chatRoomData -> chatRoomData.getMessages()))));
-  }
-
-  void writeChatRoomInfo(Flux<ChatRoomInfo> chatRoomInfoFlux);
-  Flux<ChatRoomInfo> readChatRoomInfo();
-  void writeChatRoomData(UUID chatRoomId, Flux<Message> messageFlux);
-  Flux<Message> readChatRoomData(UUID chatRoomId);
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java
deleted file mode 100644 (file)
index e1d5a5e..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.util.LinkedHashMap;
-
-
-@Slf4j
-public class InMemoryChatRoomService implements ChatRoomService
-{
-  private final LinkedHashMap<Message.MessageKey, Message> messages;
-
-
-  public InMemoryChatRoomService(Flux<Message> messageFlux)
-  {
-    log.debug("Creating InMemoryChatRoomService");
-    messages = new LinkedHashMap<>();
-    messageFlux.subscribe(message -> messages.put(message.getKey(), message));
-  }
-
-  @Override
-  public Mono<Message> persistMessage(
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    Message message = new Message(key, (long)messages.size(), timestamp, text);
-    messages.put(message.getKey(), message);
-    return Mono.just(message);
-  }
-
-  @Override
-  public Mono<Message> getMessage(Message.MessageKey key)
-  {
-    return Mono.fromSupplier(() -> messages.get(key));
-  }
-
-  @Override
-  public Flux<Message> getMessages(long first, long last)
-  {
-    return Flux.fromStream(messages
-        .values()
-        .stream()
-        .filter(message ->
-        {
-          long serial = message.getSerialNumber();
-          return serial >= first && serial <= last;
-        }));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java
deleted file mode 100644 (file)
index 263a2d5..0000000
+++ /dev/null
@@ -1,84 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.time.Clock;
-import java.util.stream.IntStream;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "services",
-    havingValue = "inmemory",
-    matchIfMissing = true)
-@Configuration
-public class InMemoryServicesConfiguration
-{
-  @Bean
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "none",
-      matchIfMissing = true)
-  ChatHomeService noneShardingChatHome(
-      ChatBackendProperties properties,
-      StorageStrategy storageStrategy,
-      Clock clock)
-  {
-    return new SimpleChatHomeService(
-        storageStrategy,
-        clock,
-        properties.getChatroomBufferSize());
-  }
-
-  @Bean
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "kafkalike")
-  ChatHomeService kafkalikeShardingChatHome(
-      ChatBackendProperties properties,
-      StorageStrategy storageStrategy,
-      Clock clock)
-  {
-    int numShards = properties.getInmemory().getNumShards();
-    SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
-    IntStream
-        .of(properties.getInmemory().getOwnedShards())
-        .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
-            shard,
-            storageStrategy,
-            clock,
-            properties.getChatroomBufferSize()));
-    ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHomeService(chatHomes, strategy);
-  }
-
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "none",
-      matchIfMissing = true)
-  @Bean
-  ShardingStrategy defaultShardingStrategy()
-  {
-    return chatRoomId -> 0;
-  }
-
-  @ConditionalOnProperty(
-      prefix = "chat.backend.inmemory",
-      name = "sharding-strategy",
-      havingValue = "kafkalike")
-  @Bean
-  ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties)
-  {
-    return new KafkaLikeShardingStrategy(
-        properties.getInmemory().getNumShards());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java
deleted file mode 100644 (file)
index e20dab7..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-public class KafkaLikeShardingStrategy implements ShardingStrategy
-{
-  private final int numPartitions;
-
-  @Override
-  public int selectShard(UUID chatRoomId)
-  {
-    byte[] serializedKey = chatRoomId.toString().getBytes();
-    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeService.java
deleted file mode 100644 (file)
index 09e4684..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class ShardedChatHomeService implements ChatHomeService
-{
-  private final SimpleChatHomeService[] chatHomes;
-  private final Set<Integer> ownedShards;
-  private final ShardingStrategy shardingStrategy;
-
-
-  public ShardedChatHomeService(
-      SimpleChatHomeService[] chatHomes,
-      ShardingStrategy shardingStrategy)
-  {
-    this.chatHomes = chatHomes;
-    this.shardingStrategy = shardingStrategy;
-    this.ownedShards = new HashSet<>();
-    for (int shard = 0; shard < chatHomes.length; shard++)
-      if(chatHomes[shard] != null)
-        this.ownedShards.add(shard);
-    log.info(
-        "Created ShardedChatHome for shards: {}",
-        ownedShards
-            .stream()
-            .map(String::valueOf)
-            .collect(Collectors.joining(", ")));
-  }
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    int shard = shardingStrategy.selectShard(id);
-    return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
-        : chatHomes[shard].createChatRoom(id, name);
-  }
-
-  @Override
-  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
-        : chatHomes[shard]
-            .getChatRoomInfo(id)
-            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
-            ? new UnknownChatroomException(
-                id,
-                shard,
-                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
-            : throwable);
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux
-        .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRoomInfo());
-  }
-
-  @Override
-  public Mono<ChatRoomData> getChatRoomData(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatHomes[shard] == null
-        ? Mono.error(new ShardNotOwnedException(shard))
-        : chatHomes[shard]
-            .getChatRoomData(id)
-            .onErrorMap(throwable -> throwable instanceof UnknownChatroomException
-                ? new UnknownChatroomException(
-                id,
-                shard,
-                ownedShards.stream().mapToInt(i -> i.intValue()).toArray())
-                : throwable);
-  }
-
-  public Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux
-        .fromIterable(ownedShards)
-        .flatMap(shard -> chatHomes[shard].getChatRoomData());
-  }
-
-
-
-  private int selectShard(UUID chatroomId)
-  {
-    return shardingStrategy.selectShard(chatroomId);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeService.java
deleted file mode 100644 (file)
index d9c3154..0000000
+++ /dev/null
@@ -1,122 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.Clock;
-import java.util.*;
-
-
-@Slf4j
-public class SimpleChatHomeService implements ChatHomeService
-{
-  private final Integer shard;
-  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
-  private final Map<UUID, ChatRoomData> chatRoomData;
-  private final Clock clock;
-  private final int bufferSize;
-
-
-
-  public SimpleChatHomeService(
-      StorageStrategy storageStrategy,
-      Clock clock,
-      int bufferSize)
-  {
-    this(
-        null,
-        storageStrategy,
-        clock,
-        bufferSize);
-  }
-
-  public SimpleChatHomeService(
-      Integer shard,
-      StorageStrategy storageStrategy,
-      Clock clock,
-      int bufferSize)
-  {
-    log.info("Created SimpleChatHome for shard {}", shard);
-;
-    this.shard = shard;
-    this.chatRoomInfo = new HashMap<>();
-    this.chatRoomData = new HashMap<>();
-    storageStrategy
-        .readChatRoomInfo()
-        .filter(info ->
-        {
-          if (shard == null || info.getShard() == shard)
-          {
-            return true;
-          }
-          else
-          {
-            log.info(
-                "SimpleChatHome for shard {} ignores not owned chat-room {}",
-                shard,
-                info);
-            return false;
-          }
-        })
-        .toStream()
-        .forEach(info ->
-        {
-          UUID chatRoomId = info.getId();
-          chatRoomInfo.put(chatRoomId, info);
-          Flux<Message> messageFlux =
-              storageStrategy.readChatRoomData(chatRoomId);
-          chatRoomData.put(
-              info.getId(),
-              new ChatRoomData(
-                  clock,
-                  new InMemoryChatRoomService(messageFlux),
-                  bufferSize));
-        });
-    this.clock = clock;
-    this.bufferSize = bufferSize;
-  }
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Creating ChatRoom with buffer-size {}", bufferSize);
-    ChatRoomService service = new InMemoryChatRoomService(Flux.empty());
-    ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard);
-    this.chatRoomInfo.put(id, chatRoomInfo);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    this.chatRoomData.put(id, chatRoomData);
-    return Mono.just(chatRoomInfo);
-  }
-
-  @Override
-  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
-  {
-    return Mono
-        .justOrEmpty(chatRoomInfo.get(id))
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux.fromIterable(chatRoomInfo.values());
-  }
-
-  @Override
-  public Mono<ChatRoomData> getChatRoomData(UUID id)
-  {
-    return Mono
-        .justOrEmpty(chatRoomData.get(id))
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
-  }
-
-  public Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux.fromIterable(chatRoomData.values());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
deleted file mode 100644 (file)
index 7e95c64..0000000
+++ /dev/null
@@ -1,436 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.*;
-import java.util.*;
-import java.util.stream.IntStream;
-
-
-@Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
-{
-  private final String topic;
-  private final Producer<String, AbstractMessageTo> producer;
-  private final Consumer<String, AbstractMessageTo> consumer;
-  private final ZoneId zoneId;
-  private final int numShards;
-  private final int bufferSize;
-  private final Clock clock;
-  private final boolean[] isShardOwned;
-  private final long[] currentOffset;
-  private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
-  private final Map<UUID, ChatRoomData>[] chatRoomData;
-
-  private boolean running;
-  @Getter
-  private volatile boolean loadInProgress;
-
-
-  public ChatRoomChannel(
-    String topic,
-    Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
-    ZoneId zoneId,
-    int numShards,
-    int bufferSize,
-    Clock clock)
-  {
-    log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
-        topic,
-        numShards);
-    this.topic = topic;
-    this.consumer = consumer;
-    this.producer = producer;
-    this.zoneId = zoneId;
-    this.numShards = numShards;
-    this.bufferSize = bufferSize;
-    this.clock = clock;
-    this.isShardOwned = new boolean[numShards];
-    this.currentOffset = new long[numShards];
-    this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
-    this.chatRoomData = new Map[numShards];
-    IntStream
-        .range(0, numShards)
-        .forEach(shard ->
-        {
-          this.chatRoomInfo[shard] = new HashMap<>();
-          this.chatRoomData[shard] = new HashMap<>();
-        });
-  }
-
-
-
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
-      UUID chatRoomId,
-      String name)
-  {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              chatRoomId.toString(),
-              createChatRoomRequestTo);
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
-          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
-          createChatRoom(chatRoomInfo);
-          sink.success(chatRoomInfo);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
-              chatRoomId,
-              name,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
-  Mono<Message> sendChatMessage(
-      UUID chatRoomId,
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              null,
-              zdt.toEpochSecond(),
-              chatRoomId.toString(),
-              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = new Message(key, metadata.offset(), timestamp, text);
-          log.info("Successfully send message {}", message);
-          sink.success(message);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
-              chatRoomId,
-              key,
-              timestamp,
-              text,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
-
-    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] =  true;
-      this.currentOffset[partition] = currentOffset;
-
-      log.info(
-          "Partition assigned: {} - loading messages: next={} -> current={}",
-          partition,
-          nextOffset[partition],
-          currentOffset);
-
-      consumer.seek(topicPartition, nextOffset[partition]);
-    });
-
-    consumer.resume(partitions);
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(topicPartition ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] = false;
-      log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
-    });
-  }
-
-  @Override
-  public void onPartitionsLost(Collection<TopicPartition> partitions)
-  {
-    log.warn("Lost partitions: {}, partitions");
-    // TODO: Muss auf den Verlust anders reagiert werden?
-    onPartitionsRevoked(partitions);
-  }
-
-  @Override
-  public void run()
-  {
-    running = true;
-
-    while (running)
-    {
-      try
-      {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
-        log.info("Fetched {} messages", records.count());
-
-        if (loadInProgress)
-        {
-          loadChatRoom(records);
-
-          if (isLoadingCompleted())
-          {
-            log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
-            loadInProgress = false;
-          }
-        }
-        else
-        {
-          if (!records.isEmpty())
-          {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
-          }
-        }
-      }
-      catch (WakeupException e)
-      {
-        log.info("Received WakeupException, exiting!");
-        running = false;
-      }
-    }
-
-    log.info("Exiting normally");
-  }
-
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
-  {
-    for (ConsumerRecord<String, AbstractMessageTo> record : records)
-    {
-      UUID chatRoomId = UUID.fromString(record.key());
-
-      switch (record.value().getType())
-      {
-        case COMMAND_CREATE_CHATROOM:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
-        case EVENT_CHATMESSAGE_RECEIVED:
-          Instant instant = Instant.ofEpochSecond(record.timestamp());
-          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-          loadChatMessage(
-              chatRoomId,
-              timestamp,
-              record.offset(),
-              (EventChatMessageReceivedTo) record.value(),
-              record.partition());
-          break;
-
-        default:
-          log.debug(
-              "Ignoring message for chat-room {} with offset {}: {}",
-              chatRoomId,
-              record.offset(),
-              record.value());
-      }
-
-      nextOffset[record.partition()] = record.offset() + 1;
-    }
-  }
-
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
-  {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
-
-  private void createChatRoom(ChatRoomInfo chatRoomInfo)
-  {
-    UUID id = chatRoomInfo.getId();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(this, id);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getName(),
-        chatRoomInfo.getShard(),
-        chatRoomData);
-  }
-
-  private void loadChatMessage(
-      UUID chatRoomId,
-      LocalDateTime timestamp,
-      long offset,
-      EventChatMessageReceivedTo chatMessageTo,
-      int partition)
-  {
-    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
-    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
-
-    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
-    KafkaChatRoomService kafkaChatRoomService =
-        (KafkaChatRoomService) chatRoomData.getChatRoomService();
-
-    kafkaChatRoomService.persistMessage(message);
-  }
-
-  private boolean isLoadingCompleted()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
-  }
-
-  private void pauseAllOwnedPartions()
-  {
-    consumer.pause(IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(topic, shard))
-        .toList());
-  }
-
-
-  private void putChatRoom(
-      UUID chatRoomId,
-      String name,
-      Integer partition,
-      ChatRoomData chatRoomData)
-  {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
-    {
-      log.warn(
-          "Ignoring existing chat-room for {}: {}",
-          partition,
-          chatRoomId);
-    }
-    else
-    {
-      log.info(
-          "Adding new chat-room to partition {}: {}",
-          partition,
-          chatRoomData);
-
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
-      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
-    }
-  }
-
-  int[] getOwnedShards()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .toArray();
-  }
-
-  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomData[shard].get(id));
-  }
-
-  Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
-  }
-
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
-  }
-
-  Flux<ChatRoomData> getChatRoomData()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values()));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
deleted file mode 100644 (file)
index d3321d7..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.utils.Utils;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatHomeService implements ChatHomeService
-{
-  private final int numPartitions;
-  private final ChatRoomChannel chatRoomChannel;
-
-
-
-  @Override
-  public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
-  {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
-  }
-
-  @Override
-  public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomInfo(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
-  }
-
-  @Override
-  public Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return chatRoomChannel.getChatRoomInfo();
-  }
-
-  @Override
-  public Mono<ChatRoomData> getChatRoomData(UUID id)
-  {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomData(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
-  }
-
-  public Flux<ChatRoomData> getChatRoomData()
-  {
-      return chatRoomChannel.getChatRoomData();
-  }
-
-  int selectShard(UUID chatRoomId)
-  {
-    byte[] serializedKey = chatRoomId.toString().getBytes();
-    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
deleted file mode 100644 (file)
index 7b2cc0b..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoomService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-import java.util.LinkedHashMap;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KafkaChatRoomService implements ChatRoomService
-{
-  private final ChatRoomChannel chatRoomChannel;
-  private final UUID chatRoomId;
-
-  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
-
-
-  @Override
-  public Mono<Message> persistMessage(
-    Message.MessageKey key,
-    LocalDateTime timestamp,
-    String text)
-  {
-    return chatRoomChannel
-        .sendChatMessage(chatRoomId, key, timestamp, text)
-        .doOnSuccess(message -> persistMessage(message));
-  }
-
-  void persistMessage(Message message)
-  {
-    messages.put  (message.getKey(), message);
-  }
-
-  @Override
-  synchronized public Mono<Message> getMessage(Message.MessageKey key)
-  {
-    return Mono.fromSupplier(() -> messages.get(key));
-  }
-
-  @Override
-  synchronized public Flux<Message> getMessages(long first, long last)
-  {
-    return Flux.fromStream(messages
-      .values()
-      .stream()
-      .filter(message ->
-      {
-        long serial = message.getSerialNumber();
-        return serial >= first && serial <= last;
-      }));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
deleted file mode 100644 (file)
index 8474239..0000000
+++ /dev/null
@@ -1,68 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "services",
-    havingValue = "kafka")
-@Component
-@Slf4j
-public class KafkaServicesApplicationRunner implements ApplicationRunner
-{
-  @Autowired
-  ChatBackendProperties properties;
-
-  @Autowired
-  ThreadPoolTaskExecutor taskExecutor;
-  @Autowired
-  ConfigurableApplicationContext context;
-
-  @Autowired
-  ChatRoomChannel chatRoomChannel;
-  @Autowired
-  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
-
-  CompletableFuture<Void> chatRoomChannelConsumerJob;
-
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
-  {
-    List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
-    chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
-    log.info("Starting the consumer for the ChatRoomChannel");
-    chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
-  }
-
-  @PreDestroy
-  public void joinChatRoomChannelConsumerJob()
-  {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    chatRoomChannelConsumerJob.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
deleted file mode 100644 (file)
index c4479ce..0000000
+++ /dev/null
@@ -1,177 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.time.Clock;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-
-@ConditionalOnProperty(
-    prefix = "chat.backend",
-    name = "services",
-    havingValue = "kafka")
-@Configuration
-public class KafkaServicesConfiguration
-{
-  @Bean
-  ChatHomeService kafkaChatHome(
-      ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
-  {
-    return new KafkaChatHomeService(
-        properties.getKafka().getNumPartitions(),
-        chatRoomChannel);
-  }
-
-  @Bean
-  ChatRoomChannel chatRoomChannel(
-      ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
-      ZoneId zoneId,
-      Clock clock)
-  {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatRoomChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
-        zoneId,
-        properties.getKafka().getNumPartitions(),
-        properties.getChatroomBufferSize(),
-        clock);
-  }
-
-  @Bean
-  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
-      Properties defaultProducerProperties,
-      ChatBackendProperties chatBackendProperties,
-      StringSerializer stringSerializer,
-      JsonSerializer<AbstractMessageTo> messageSerializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
-    return new KafkaProducer<>(
-        properties,
-        stringSerializer,
-        messageSerializer);
-  }
-
-  @Bean
-  StringSerializer stringSerializer()
-  {
-    return new StringSerializer();
-  }
-
-  @Bean
-  JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
-  {
-    JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
-    serializer.configure(
-        Map.of(
-            JsonSerializer.TYPE_MAPPINGS, typeMappings),
-        false);
-    return serializer;
-  }
-
-  @Bean
-  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
-      Properties defaultConsumerProperties,
-      ChatBackendProperties chatBackendProperties,
-      StringDeserializer stringDeserializer,
-      JsonDeserializer<AbstractMessageTo> messageDeserializer)
-  {
-    Map<String, Object> properties = new HashMap<>();
-    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
-    properties.put(
-        ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
-    properties.put(
-        ConsumerConfig.GROUP_ID_CONFIG,
-        "chatroom_channel");
-    return new KafkaConsumer<>(
-        properties,
-        stringDeserializer,
-        messageDeserializer);
-  }
-
-  @Bean
-  StringDeserializer stringDeserializer()
-  {
-    return new StringDeserializer();
-  }
-
-  @Bean
-  JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
-  {
-    JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
-    deserializer.configure(
-        Map.of(
-            JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
-            JsonDeserializer.TYPE_MAPPINGS, typeMappings),
-        false );
-    return deserializer;
-  }
-
-  @Bean
-  String typeMappings ()
-  {
-    return
-        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
-        "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
-  }
-
-  @Bean
-  Properties defaultProducerProperties(ChatBackendProperties chatBackendProperties)
-  {
-    Properties properties = new Properties();
-    properties.setProperty(
-        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
-        chatBackendProperties.getKafka().getBootstrapServers());
-    return properties;
-  }
-
-  @Bean
-  Properties defaultConsumerProperties(ChatBackendProperties chatBackendProperties)
-  {
-    Properties properties = new Properties();
-    properties.setProperty(
-        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
-        chatBackendProperties.getKafka().getBootstrapServers());
-    properties.setProperty(
-        ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-        "false");
-    properties.setProperty(
-        ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
-        "earliest");
-    return properties;
-  }
-
-  @Bean
-  ZoneId zoneId()
-  {
-    return ZoneId.systemDefault();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java
deleted file mode 100644 (file)
index 7cc7541..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-public class AbstractMessageTo
-{
-  public enum ToType {
-    COMMAND_CREATE_CHATROOM,
-    EVENT_CHATMESSAGE_RECEIVED,
-  }
-
-  @Getter
-  private final ToType type;
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java
deleted file mode 100644 (file)
index 1a134f3..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import lombok.*;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class CommandCreateChatRoomTo extends AbstractMessageTo
-{
-  private String name;
-
-
-  public CommandCreateChatRoomTo()
-  {
-    super(ToType.COMMAND_CREATE_CHATROOM);
-  }
-
-
-  public static CommandCreateChatRoomTo of(String name)
-  {
-    CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
-    to.name = name;
-    return to;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java
deleted file mode 100644 (file)
index 2297b94..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.*;
-
-import java.time.LocalDateTime;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class EventChatMessageReceivedTo extends AbstractMessageTo
-{
-  private String user;
-  private Long id;
-  private String text;
-
-
-  public EventChatMessageReceivedTo()
-  {
-    super(ToType.EVENT_CHATMESSAGE_RECEIVED);
-  }
-
-
-  public Message toMessage(long offset, LocalDateTime timestamp)
-  {
-    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
-  }
-
-  public static EventChatMessageReceivedTo from(Message message)
-  {
-    return EventChatMessageReceivedTo.of(
-        message.getUsername(),
-        message.getId(),
-        message.getMessageText());
-  }
-
-
-  public static EventChatMessageReceivedTo of(String user, Long id, String text)
-  {
-    EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
-    to.user = user;
-    to.id = id;
-    to.text = text;
-    return to;
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..4c922ce
--- /dev/null
@@ -0,0 +1,65 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+import java.util.stream.IntStream;
+
+public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    ShardedChatHomeService chatHome(
+        StorageStrategy storageStrategy,
+        Clock clock)
+    {
+      SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS];
+
+      IntStream
+          .of(ownedShards())
+          .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
+              shard,
+              storageStrategy,
+              clock,
+              bufferSize()));
+
+      ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
+
+      return new ShardedChatHomeService(chatHomes, strategy);
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy(Clock clock)
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          new KafkaLikeShardingStrategy(NUM_SHARDS),
+          new ObjectMapper());
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+
+    int[] ownedShards()
+    {
+      return new int[] { OWNED_SHARD };
+    }
+
+    int bufferSize()
+    {
+      return 8;
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..fd5a83b
--- /dev/null
@@ -0,0 +1,50 @@
+package de.juplo.kafka.chat.backend.persistence.inmemory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
+import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
+import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+
+import java.nio.file.Paths;
+import java.time.Clock;
+
+
+public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
+{
+  @TestConfiguration
+  static class Configuration
+  {
+    @Bean
+    SimpleChatHomeService chatHome(
+        StorageStrategy storageStrategy,
+        Clock clock)
+    {
+      return new SimpleChatHomeService(
+          storageStrategy,
+          clock,
+          bufferSize());
+    }
+
+    @Bean
+    public FilesStorageStrategy storageStrategy(Clock clock)
+    {
+      return new FilesStorageStrategy(
+          Paths.get("target", "test-classes", "data", "files"),
+          chatRoomId -> 0,
+          new ObjectMapper());
+    }
+
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+
+    int bufferSize()
+    {
+      return 8;
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
new file mode 100644 (file)
index 0000000..3633aa5
--- /dev/null
@@ -0,0 +1,113 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.time.Clock;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
+import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC;
+
+
+@SpringBootTest(
+    classes = {
+        KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class,
+        KafkaServicesConfiguration.class,
+        KafkaAutoConfiguration.class,
+        TaskExecutionAutoConfiguration.class,
+    },
+    properties = {
+    "chat.backend.services=kafka",
+    "chat.backend.kafka.client-id-PREFIX=TEST",
+    "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+    "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+    "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
+})
+@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@Slf4j
+public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
+{
+  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+
+  static CompletableFuture<Void> CONSUMER_JOB;
+
+
+  @TestConfiguration
+  @EnableConfigurationProperties(ChatBackendProperties.class)
+  static class KafkaChatHomeTestConfiguration
+  {
+    @Bean
+    Clock clock()
+    {
+      return Clock.systemDefaultZone();
+    }
+  }
+
+
+  @BeforeAll
+  public static void sendAndLoadStoredData(
+      @Autowired KafkaTemplate<String, String> messageTemplate,
+      @Autowired Consumer chatRoomChannelConsumer,
+      @Autowired ThreadPoolTaskExecutor taskExecutor,
+      @Autowired ChatRoomChannel chatRoomChannel)
+  {
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+
+    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+    chatRoomChannelConsumer.assign(assignedPartitions);
+    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    CONSUMER_JOB = taskExecutor
+        .submitCompletable(chatRoomChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    SendResult<String, String> result = kafkaTemplate.send(record).join();
+    log.info(
+        "Sent {}={} to {}",
+        key,
+        value,
+        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
+  }
+
+  @AfterAll
+  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+  {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    chatRoomChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    CONSUMER_JOB.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/CommandCreateChatRoomToTest.java
new file mode 100644 (file)
index 0000000..5ef12ef
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CommandCreateChatRoomToTest
+{
+  final String json = """
+  {
+    "name": "Foo-Room!"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+    assertThat(message.getName()).isEqualTo("Foo-Room!");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/EventChatMessageReceivedToTest.java
new file mode 100644 (file)
index 0000000..33a7827
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class EventChatMessageReceivedToTest
+{
+  final String json = """
+  {
+    "id": 1,
+    "text": "Hallo, ich heiße Peter!",
+    "user": "Peter"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+    assertThat(message.getUser()).isEqualTo("Peter");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeServiceTest.java
deleted file mode 100644 (file)
index 4c922ce..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import de.juplo.kafka.chat.backend.persistence.ShardingStrategy;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-import java.util.stream.IntStream;
-
-public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
-  @TestConfiguration
-  static class Configuration
-  {
-    @Bean
-    ShardedChatHomeService chatHome(
-        StorageStrategy storageStrategy,
-        Clock clock)
-    {
-      SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[NUM_SHARDS];
-
-      IntStream
-          .of(ownedShards())
-          .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
-              shard,
-              storageStrategy,
-              clock,
-              bufferSize()));
-
-      ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
-
-      return new ShardedChatHomeService(chatHomes, strategy);
-    }
-
-    @Bean
-    public FilesStorageStrategy storageStrategy(Clock clock)
-    {
-      return new FilesStorageStrategy(
-          Paths.get("target", "test-classes", "data", "files"),
-          new KafkaLikeShardingStrategy(NUM_SHARDS),
-          new ObjectMapper());
-    }
-
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-
-    int[] ownedShards()
-    {
-      return new int[] { OWNED_SHARD };
-    }
-
-    int bufferSize()
-    {
-      return 8;
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeServiceTest.java
deleted file mode 100644 (file)
index fd5a83b..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceTest;
-import de.juplo.kafka.chat.backend.persistence.StorageStrategy;
-import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-
-import java.nio.file.Paths;
-import java.time.Clock;
-
-
-public class SimpleChatHomeServiceTest extends ChatHomeServiceTest
-{
-  @TestConfiguration
-  static class Configuration
-  {
-    @Bean
-    SimpleChatHomeService chatHome(
-        StorageStrategy storageStrategy,
-        Clock clock)
-    {
-      return new SimpleChatHomeService(
-          storageStrategy,
-          clock,
-          bufferSize());
-    }
-
-    @Bean
-    public FilesStorageStrategy storageStrategy(Clock clock)
-    {
-      return new FilesStorageStrategy(
-          Paths.get("target", "test-classes", "data", "files"),
-          chatRoomId -> 0,
-          new ObjectMapper());
-    }
-
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-
-    int bufferSize()
-    {
-      return 8;
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeServiceTest.java
deleted file mode 100644 (file)
index 3633aa5..0000000
+++ /dev/null
@@ -1,113 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.time.Clock;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeServiceTest.TOPIC;
-
-
-@SpringBootTest(
-    classes = {
-        KafkaChatHomeServiceTest.KafkaChatHomeTestConfiguration.class,
-        KafkaServicesConfiguration.class,
-        KafkaAutoConfiguration.class,
-        TaskExecutionAutoConfiguration.class,
-    },
-    properties = {
-    "chat.backend.services=kafka",
-    "chat.backend.kafka.client-id-PREFIX=TEST",
-    "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-    "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
-    "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
-})
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
-@Slf4j
-public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
-{
-  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
-
-  static CompletableFuture<Void> CONSUMER_JOB;
-
-
-  @TestConfiguration
-  @EnableConfigurationProperties(ChatBackendProperties.class)
-  static class KafkaChatHomeTestConfiguration
-  {
-    @Bean
-    Clock clock()
-    {
-      return Clock.systemDefaultZone();
-    }
-  }
-
-
-  @BeforeAll
-  public static void sendAndLoadStoredData(
-      @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired Consumer chatRoomChannelConsumer,
-      @Autowired ThreadPoolTaskExecutor taskExecutor,
-      @Autowired ChatRoomChannel chatRoomChannel)
-  {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
-
-    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
-    chatRoomChannelConsumer.assign(assignedPartitions);
-    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
-    CONSUMER_JOB = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
-  }
-
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
-  {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
-    record.headers().add("__TypeId__", typeId.getBytes());
-    SendResult<String, String> result = kafkaTemplate.send(record).join();
-    log.info(
-        "Sent {}={} to {}",
-        key,
-        value,
-        new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
-  }
-
-  @AfterAll
-  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
-  {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    CONSUMER_JOB.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java
deleted file mode 100644 (file)
index 5ef12ef..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class CommandCreateChatRoomToTest
-{
-  final String json = """
-  {
-    "name": "Foo-Room!"
-  }""";
-
-  ObjectMapper mapper;
-
-  @BeforeEach
-  public void setUp()
-  {
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-
-  @Test
-  public void testDeserialization() throws Exception
-  {
-    CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
-    assertThat(message.getName()).isEqualTo("Foo-Room!");
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java
deleted file mode 100644 (file)
index 33a7827..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka.messages;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class EventChatMessageReceivedToTest
-{
-  final String json = """
-  {
-    "id": 1,
-    "text": "Hallo, ich heiße Peter!",
-    "user": "Peter"
-  }""";
-
-  ObjectMapper mapper;
-
-  @BeforeEach
-  public void setUp()
-  {
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-
-  @Test
-  public void testDeserialization() throws Exception
-  {
-    EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
-    assertThat(message.getId()).isEqualTo(1l);
-    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
-    assertThat(message.getUser()).isEqualTo("Peter");
-  }
-}