NG
authorKai Moritz <kai@juplo.de>
Fri, 21 Apr 2023 08:28:02 +0000 (10:28 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 18 Aug 2023 15:18:41 +0000 (17:18 +0200)
NEU vs. NG ??

Besser:

 - Create-Requests für ChatRoom's auch in den Message-Channel schreiben
 - Dann, wenn sie dort gelesen werden, _zusätzlich nachträglich_ in
   den Chatroom-Channel.
 - Grund: Dann fällt das hier übrig gebliebene _nicht-triviale_
   Problem weg, bzw. löst sich in Wohlgefallen auf, da die Create-Requests
   automatisch in der richtigen Reihenfolge (also vor allen Messages,
   für einen bestimmten ChatRoom) in dem Message-Channel gelesen werden

Außerdem:

 - Der Chatroom-Channel wird ("später") auch als allgemeiner Info-Channel
   benötigt, in den die Instanzen _insbesondere_ auch veröffentlichen,
   welche Partitionen ihnen gerade zugeordnet sind.
 - Der Chatroom-Channel sollte daher auf Dauer Info-Channel heißen und
   der Message-Channel eher allgemeiner Chatroom-Channel (im Sinne von
   hier alles zum Thema ChatRoom und den daran veröffentlichten
   Nachrichten...)

19 files changed:
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java [deleted file]

index 6091c0c..15d542a 100644 (file)
@@ -10,5 +10,5 @@ public interface ChatHome
 {
   Mono<ChatRoom> getChatRoom(UUID id);
 
-  Flux<ChatRoom> getChatRooms();
+  Flux<ChatRoomInfo> getChatRooms();
 }
index bedd0aa..3807680 100644 (file)
@@ -1,11 +1,12 @@
 package de.juplo.kafka.chat.backend.persistence;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import reactor.core.publisher.Flux;
 
 
 public interface StorageStrategy
 {
-  void write(Flux<ChatRoom> chatroomFlux);
+  void write(Flux<ChatRoomInfo> chatroomFlux);
   Flux<ChatRoom> read();
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java
new file mode 100644 (file)
index 0000000..5c08aa2
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractTo
+{
+  public enum ToType {
+    CREATE_CHATROOM_REQUEST,
+    MESSAGE_SENT,
+    CHATROOM_INFO
+  }
+
+  @Getter
+  private final ToType type;
+}
index ac30f1d..94f6fa6 100644 (file)
@@ -1,8 +1,8 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -25,15 +25,16 @@ import java.util.stream.IntStream;
 public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
-  private final Producer<String, MessageTo> producer;
-  private final Consumer<String, MessageTo> consumer;
+  private final Producer<String, AbstractTo> producer;
+  private final Consumer<String, AbstractTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
+  private final int bufferSize;
+  private final Clock clock;
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoom>[] chatrooms;
-  private final KafkaLikeShardingStrategy shardingStrategy;
 
   private boolean running;
   @Getter
@@ -42,10 +43,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   public ChatMessageChannel(
     String topic,
-    Producer<String, MessageTo> producer,
-    Consumer<String, MessageTo> consumer,
+    Producer<String, AbstractTo> producer,
+    Consumer<String, AbstractTo> consumer,
     ZoneId zoneId,
-    int numShards)
+    int numShards,
+    int bufferSize,
+    Clock clock)
   {
     log.debug(
         "Creating ChatMessageChannel for topic {} with {} partitions",
@@ -56,6 +59,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     this.producer = producer;
     this.zoneId = zoneId;
     this.numShards = numShards;
+    this.bufferSize = bufferSize;
+    this.clock = clock;
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
@@ -63,28 +68,62 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     IntStream
         .range(0, numShards)
         .forEach(shard -> this.chatrooms[shard] = new HashMap<>());
-    this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
   }
 
 
-  Mono<Message> sendMessage(
+
+  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+      UUID chatRoomId,
+      String name)
+  {
+    CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, CreateChatRoomRequestTo> record =
+          new ProducerRecord<>(
+              topic,
+              chatRoomId.toString(),
+              createChatRoomRequestTo);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+          createChatRoom(chatRoomInfo);
+          sink.success(chatRoomInfo);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send create-request for chat room (id={}, name={}): {}",
+              chatRoomId,
+              name,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  Mono<Message> sendChatMessage(
       UUID chatRoomId,
       Message.MessageKey key,
       LocalDateTime timestamp,
       String text)
   {
-    int shard = this.shardingStrategy.selectShard(chatRoomId);
-    TopicPartition tp = new TopicPartition(topic, shard);
     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, MessageTo> record =
+      ProducerRecord<String, AbstractTo> record =
           new ProducerRecord<>(
-              tp.topic(),
-              tp.partition(),
+              topic,
+              null,
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+              ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
@@ -165,12 +204,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
         {
-          loadMessages(records);
+          loadChatRoom(records);
 
           if (isLoadingCompleted())
           {
@@ -198,33 +237,84 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  void loadMessages(ConsumerRecords<String, MessageTo> records)
+  void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
   {
-    for (ConsumerRecord<String, MessageTo> record : records)
+    for (ConsumerRecord<String, AbstractTo> record : records)
     {
-      nextOffset[record.partition()] = record.offset() + 1;
       UUID chatRoomId = UUID.fromString(record.key());
-      MessageTo messageTo = record.value();
-
-      Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
 
-      Instant instant = Instant.ofEpochSecond(record.timestamp());
-      LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-
-      Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
-
-      ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
-      if (chatRoom == null)
+      switch (record.value().getType())
       {
-        // Alles pausieren und erst von putChatRoom wieder resumen lassen!
+        case CREATE_CHATROOM_REQUEST:
+          createChatRoom(
+              chatRoomId,
+              (CreateChatRoomRequestTo) record.value(),
+              record.partition());
+          break;
+
+        case MESSAGE_SENT:
+          Instant instant = Instant.ofEpochSecond(record.timestamp());
+          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+          loadChatMessage(
+              chatRoomId,
+              timestamp,
+              record.offset(),
+              (ChatMessageTo) record.value(),
+              record.partition());
+          break;
+
+        default:
+          log.debug(
+              "Ignoring message for chat-room {} with offset {}: {}",
+              chatRoomId,
+              record.offset(),
+              record.value());
       }
-      KafkaChatRoomService kafkaChatRoomService =
-          (KafkaChatRoomService) chatRoom.getChatRoomService();
 
-      kafkaChatRoomService.persistMessage(message);
+      nextOffset[record.partition()] = record.offset() + 1;
     }
   }
 
+  void createChatRoom(
+      UUID chatRoomId,
+      CreateChatRoomRequestTo createChatRoomRequestTo,
+      int partition)
+  {
+    putChatRoom(ChatRoomInfo.of(
+        chatRoomId,
+        createChatRoomRequestTo.getName(),
+        partition));
+  }
+
+
+  void createChatRoom(ChatRoomInfo chatRoomInfo)
+  {
+    UUID id = chatRoomInfo.getId();
+    String name = chatRoomInfo.getName();
+    int shard = chatRoomInfo.getShard();
+    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+    KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+    putChatRoom(chatRoom);
+  }
+
+  void loadChatMessage(
+      UUID chatRoomId,
+      LocalDateTime timestamp,
+      long offset,
+      ChatMessageTo chatMessageTo,
+      int partition)
+  {
+    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+
+    ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+    KafkaChatRoomService kafkaChatRoomService =
+        (KafkaChatRoomService) chatRoom.getChatRoomService();
+
+    kafkaChatRoomService.persistMessage(message);
+  }
+
   boolean isLoadingCompleted()
   {
     return IntStream
@@ -243,7 +333,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  void putChatRoom(ChatRoom chatRoom)
+  private void putChatRoom(ChatRoom chatRoom)
   {
     Integer partition = chatRoom.getShard();
     UUID chatRoomId = chatRoom.getId();
@@ -266,13 +356,4 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
   {
     return Mono.justOrEmpty(chatrooms[shard].get(id));
   }
-
-  Flux<ChatRoom> getChatRooms()
-  {
-    return Flux.fromStream(IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> Integer.valueOf(shard))
-        .flatMap(shard -> chatrooms[shard].values().stream()));
-  }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java
new file mode 100644 (file)
index 0000000..41ce00a
--- /dev/null
@@ -0,0 +1,45 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class ChatMessageTo extends AbstractTo
+{
+  private String user;
+  private Long id;
+  private String text;
+
+
+  public ChatMessageTo()
+  {
+    super(ToType.MESSAGE_SENT);
+  }
+
+
+  public Message toMessage(long offset, LocalDateTime timestamp)
+  {
+    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+  }
+
+  public static ChatMessageTo from(Message message)
+  {
+    return ChatMessageTo.of(
+        message.getUsername(),
+        message.getId(),
+        message.getMessageText());
+  }
+
+
+  public static ChatMessageTo of(String user, Long id, String text)
+  {
+    ChatMessageTo to = new ChatMessageTo();
+    to.user = user;
+    to.id = id;
+    to.text = text;
+    return to;
+  }
+}
index 97ee988..1c6ae91 100644 (file)
@@ -10,11 +10,15 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
+import java.util.stream.IntStream;
 
 
 @RequiredArgsConstructor
@@ -22,51 +26,12 @@ import java.util.UUID;
 public class ChatRoomChannel implements Runnable
 {
   private final String topic;
-  private final Producer<Integer, ChatRoomTo> producer;
-  private final Consumer<Integer, ChatRoomTo> consumer;
-  private final ShardingStrategy shardingStrategy;
-  private final ChatMessageChannel chatMessageChannel;
-  private final Clock clock;
-  private final int bufferSize;
+  private final Consumer<String, AbstractTo> consumer;
+  private final Map<UUID, ChatRoomInfo> chatrooms = new HashMap<>();
 
   private boolean running;
 
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
-      UUID chatRoomId,
-      String name)
-  {
-    int shard = this.shardingStrategy.selectShard(chatRoomId);
-    ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<Integer, ChatRoomTo> record =
-          new ProducerRecord<>(
-              topic,
-              shard,
-              chatRoomTo);
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
-          sink.success(chatRoomTo.toChatRoomInfo());
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
-              chatRoomId,
-              name,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
   @Override
   public void run()
   {
@@ -78,12 +43,24 @@ public class ChatRoomChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
-        for (ConsumerRecord<Integer, ChatRoomTo> record : records)
+        for (ConsumerRecord<String, AbstractTo> record : records)
         {
-          createChatRoom(record.value().toChatRoomInfo());
+          switch (record.value().getType())
+          {
+            case CHATROOM_INFO:
+              createChatRoom((ChatRoomInfoTo) record.value());
+              break;
+
+            default:
+              log.debug(
+                  "Ignoring message for key {} with offset {}: {}",
+                  record.key(),
+                  record.offset(),
+                  record.value());
+          }
         }
       }
       catch (WakeupException e)
@@ -97,14 +74,14 @@ public class ChatRoomChannel implements Runnable
   }
 
 
-  void createChatRoom(ChatRoomInfo chatRoomInfo)
+  void createChatRoom(ChatRoomInfoTo chatRoomInfoTo)
+  {
+    ChatRoomInfo chatRoomInfo = chatRoomInfoTo.toChatRoomInfo();
+    chatrooms.put(chatRoomInfo.getId(), chatRoomInfo);
+  }
+
+  Flux<ChatRoomInfo> getChatRooms()
   {
-    UUID id = chatRoomInfo.getId();
-    String name = chatRoomInfo.getName();
-    int shard = chatRoomInfo.getShard();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatRoomService service = new KafkaChatRoomService(chatMessageChannel, id);
-    ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
-    chatMessageChannel.putChatRoom(chatRoom);
+    return Flux.fromIterable(chatrooms.values());
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java
new file mode 100644 (file)
index 0000000..f232c78
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class ChatRoomInfoTo extends AbstractTo
+{
+  private String id;
+  private String name;
+  private int shard;
+
+
+  public ChatRoomInfoTo()
+  {
+    super(ToType.CHATROOM_INFO);
+  }
+
+
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(UUID.fromString(id), name, shard);
+  }
+
+  public static ChatRoomInfoTo from(ChatRoom chatRoom)
+  {
+    return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+  }
+
+  public static ChatRoomInfoTo of(String id, String name, int shard)
+  {
+    ChatRoomInfoTo to = new ChatRoomInfoTo();
+    to.id = id;
+    to.name = name;
+    to.shard = shard;
+    return to;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java
deleted file mode 100644 (file)
index e564981..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.UUID;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class ChatRoomTo
-{
-  private String id;
-  private String name;
-  private int shard;
-
-  public ChatRoomInfo toChatRoomInfo()
-  {
-    return new ChatRoomInfo(UUID.fromString(id), name, shard);
-  }
-
-  public static ChatRoomTo from(ChatRoom chatRoom)
-  {
-    return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java
new file mode 100644 (file)
index 0000000..b6ad38f
--- /dev/null
@@ -0,0 +1,28 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class CreateChatRoomRequestTo extends AbstractTo
+{
+  private String name;
+
+
+  public CreateChatRoomRequestTo()
+  {
+    super(ToType.CREATE_CHATROOM_REQUEST);
+  }
+
+
+  public static CreateChatRoomRequestTo of(String name)
+  {
+    CreateChatRoomRequestTo to = new CreateChatRoomRequestTo();
+    to.name = name;
+    return to;
+  }
+}
index 88947a0..ac87aac 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -17,6 +16,7 @@ import java.util.*;
 public class KafkaChatHome implements ChatHome
 {
   private final ShardingStrategy shardingStrategy;
+  private final ChatRoomChannel chatRoomChannel;
   private final ChatMessageChannel chatMessageChanel;
 
 
@@ -35,15 +35,8 @@ public class KafkaChatHome implements ChatHome
   }
 
   @Override
-  public Flux<ChatRoom> getChatRooms()
+  public Flux<ChatRoomInfo> getChatRooms()
   {
-    if (chatMessageChanel.isLoadInProgress())
-    {
-      throw new LoadInProgressException();
-    }
-    else
-    {
-      return chatMessageChanel.getChatRooms();
-    }
+      return chatRoomChannel.getChatRooms();
   }
 }
index 0986194..f802234 100644 (file)
@@ -28,7 +28,7 @@ public class KafkaChatRoomService implements ChatRoomService
     String text)
   {
     return chatMessageChannel
-        .sendMessage(chatRoomId, key, timestamp, text)
+        .sendChatMessage(chatRoomId, key, timestamp, text)
         .doOnSuccess(message -> persistMessage(message));
   }
 
index ee5834e..8a9e32e 100644 (file)
@@ -30,11 +30,11 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ChatRoomChannel chatRoomChannel;
   @Autowired
-  Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
+  Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer;
   @Autowired
   ChatMessageChannel chatMessageChannel;
   @Autowired
-  Consumer<String, MessageTo> chatMessageChannelConsumer;
+  Consumer<String, ChatMessageTo> chatMessageChannelConsumer;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
   CompletableFuture<Void> chatMessageChannelConsumerJob;
index 4350779..9e1f75e 100644 (file)
@@ -51,8 +51,8 @@ public class KafkaServicesConfiguration
   @Bean
   ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
-      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+      Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
+      Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
       ShardingStrategy shardingStrategy,
       ChatMessageChannel chatMessageChannel,
       Clock clock)
@@ -68,11 +68,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+  Producer<Integer, CreateChatRoomRequestTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       IntegerSerializer integerSerializer,
-      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+      JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -92,9 +92,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
   {
-    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
     serializer.configure(
         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
         false);
@@ -102,11 +102,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+  Consumer<Integer, CreateChatRoomRequestTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       IntegerDeserializer integerDeserializer,
-      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+      JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -129,13 +129,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
   {
-    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
         false );
     return deserializer;
@@ -150,8 +150,8 @@ public class KafkaServicesConfiguration
   @Bean
   ChatMessageChannel chatMessageChannel(
       ChatBackendProperties properties,
-      Producer<String, MessageTo> chatMessageChannelProducer,
-      Consumer<String, MessageTo> chatMessageChannelConsumer,
+      Producer<String, AbstractTo> chatMessageChannelProducer,
+      Consumer<String, AbstractTo> chatMessageChannelConsumer,
       ZoneId zoneId)
   {
     return new ChatMessageChannel(
@@ -163,11 +163,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, MessageTo>  chatMessageChannelProducer(
+  Producer<String, AbstractTo>  chatMessageChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
-      JsonSerializer<MessageTo> messageSerializer)
+      JsonSerializer<AbstractTo> messageSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -187,21 +187,23 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<MessageTo> chatMessageSerializer()
+  JsonSerializer<AbstractTo> chatMessageSerializer()
   {
-    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
     serializer.configure(
-        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        Map.of(JsonSerializer.TYPE_MAPPINGS,
+            "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
+            "message:" + ChatMessageTo.class.getCanonicalName()),
         false);
     return serializer;
   }
 
   @Bean
-  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+  Consumer<String, ChatMessageTo>  chatMessageChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
-      JsonDeserializer<MessageTo> messageDeserializer)
+      JsonDeserializer<ChatMessageTo> messageDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -224,13 +226,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
   {
-    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
         false );
     return deserializer;
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java
deleted file mode 100644 (file)
index 0a867f1..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.time.LocalDateTime;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class MessageTo
-{
-  private String user;
-  private Long id;
-  private String text;
-
-  public Message toMessage(long offset, LocalDateTime timestamp)
-  {
-    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
-  }
-
-  public static MessageTo from(Message message)
-  {
-    return
-        new MessageTo(
-            message.getUsername(),
-            message.getId(),
-            message.getMessageText());
-  }
-}
index fc2b7c8..4e9ad23 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -37,10 +38,17 @@ class KafkaConfigurationIT extends AbstractConfigurationIT
   {
     UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
     int shard = shardingStrategy.selectShard(chatRoomId);
-    chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+  }
+
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    kafkaTemplate.send(record);
   }
 }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java
new file mode 100644 (file)
index 0000000..4a6c1c3
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatMessageToTest
+{
+  final String json = """
+  {
+    "id": 1,
+    "text": "Hallo, ich heiße Peter!",
+    "user": "Peter"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    ChatMessageTo message = mapper.readValue(json, ChatMessageTo.class);
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+    assertThat(message.getUser()).isEqualTo("Peter");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java
new file mode 100644 (file)
index 0000000..6132eac
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatRoomInfoToTest
+{
+  final String json = """
+  {
+    "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+    "name": "Foo-Room!",
+    "shard": 666
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    ChatRoomInfoTo message = mapper.readValue(json, ChatRoomInfoTo.class);
+    assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+    assertThat(message.getName()).isEqualTo("Foo-Room!");
+    assertThat(message.getShard()).isEqualTo(666);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java
new file mode 100644 (file)
index 0000000..e7b749c
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CreateChatRoomRequestToTest
+{
+  final String json = """
+  {
+    "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+    "name": "Foo-Room!",
+    "shard": 666
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    CreateChatRoomRequestTo message = mapper.readValue(json, CreateChatRoomRequestTo.class);
+    assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+    assertThat(message.getName()).isEqualTo("Foo-Room!");
+    assertThat(message.getShard()).isEqualTo(666);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java
deleted file mode 100644 (file)
index 0c4884b..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class MessageToTest
-{
-  final String json = """
-  {
-    "id": 1,
-    "text": "Hallo, ich heiße Peter!",
-    "user": "Peter"
-  }""";
-
-  ObjectMapper mapper;
-
-  @BeforeEach
-  public void setUp()
-  {
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-
-  @Test
-  public void testDeserialization() throws Exception
-  {
-    MessageTo message = mapper.readValue(json, MessageTo.class);
-    assertThat(message.getId()).isEqualTo(1l);
-    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
-    assertThat(message.getUser()).isEqualTo("Peter");
-  }
-}