WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 11 Sep 2023 16:36:13 +0000 (18:36 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:35:20 +0000 (23:35 +0200)
16 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/AbstractDataMessageTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToData.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatDataMessageReceivedTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java

index 9c80f5d..381c6c6 100644 (file)
@@ -36,7 +36,8 @@ public class ChatBackendProperties
   {
     private String clientIdPrefix;
     private String bootstrapServers = ":9092";
-    private String chatRoomChannelTopic = "message_channel";
+    private String infoChannelTopic = "info_channel";
+    private String dataChannelTopic = "data_channel";
     private int numPartitions = 2;
   }
 
index 3f4faa5..c1067bc 100644 (file)
@@ -3,9 +3,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -16,7 +16,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
@@ -28,8 +27,8 @@ import java.util.stream.IntStream;
 public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
-  private final Producer<String, AbstractMessageTo> producer;
-  private final Consumer<String, AbstractMessageTo> consumer;
+  private final Producer<String, AbstractDataMessageTo> producer;
+  private final Consumer<String, AbstractDataMessageTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
   private final int bufferSize;
@@ -46,15 +45,15 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   public DataChannel(
     String topic,
-    Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
+    Producer<String, AbstractDataMessageTo> producer,
+    Consumer<String, AbstractDataMessageTo> consumer,
     ZoneId zoneId,
     int numShards,
     int bufferSize,
     Clock clock)
   {
     log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
+        "Creating DataChannel for topic {} with {} partitions",
         topic,
         numShards);
     this.topic = topic;
@@ -82,10 +81,10 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+    CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractMessageTo> record =
+      ProducerRecord<String, AbstractDataMessageTo> record =
           new ProducerRecord<>(
               topic,
               chatRoomId.toString(),
@@ -123,13 +122,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractMessageTo> record =
+      ProducerRecord<String, AbstractDataMessageTo> record =
           new ProducerRecord<>(
               topic,
               null,
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+              EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
@@ -208,7 +207,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
@@ -241,9 +240,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
   {
-    for (ConsumerRecord<String, AbstractMessageTo> record : records)
+    for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
     {
       UUID chatRoomId = UUID.fromString(record.key());
 
@@ -252,7 +251,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
+              (CommandCreateChatRoomToData) record.value(),
               record.partition());
           break;
 
@@ -263,7 +262,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
               chatRoomId,
               timestamp,
               record.offset(),
-              (EventChatMessageReceivedTo) record.value(),
+              (EventChatDataMessageReceivedTo) record.value(),
               record.partition());
           break;
 
@@ -281,7 +280,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   private void createChatRoom(
       UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
+      CommandCreateChatRoomToData createChatRoomRequestTo,
       Integer partition)
   {
     log.info(
@@ -319,7 +318,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      EventChatMessageReceivedTo chatMessageTo,
+      EventChatDataMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
@@ -356,7 +355,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       Integer partition,
       ChatRoomData chatRoomData)
   {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+    if (this.chatRoomData[partition].containsKey(chatRoomId))
     {
       log.warn(
           "Ignoring existing chat-room for {}: {}",
@@ -370,9 +369,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           partition,
           chatRoomData);
 
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
       this.chatRoomData[partition].put(chatRoomId, chatRoomData);
     }
   }
index a3a5b43..2823d9d 100644 (file)
@@ -5,13 +5,11 @@ import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
@@ -30,58 +28,43 @@ import java.util.stream.IntStream;
 
 
 @Slf4j
-public class InfoChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable
 {
-  private final String topic;
-  private final Producer<String, AbstractMessageTo> producer;
-  private final Consumer<String, AbstractMessageTo> consumer;
+  private final DataChannel dataChannel;
+  private final String infoTopic;
+  private final String dataTopic;
+  private final Producer<String, AbstractDataMessageTo> producer;
+  private final Consumer<String, AbstractDataMessageTo> consumer;
   private final ZoneId zoneId;
-  private final int numShards;
   private final int bufferSize;
   private final Clock clock;
-  private final boolean[] isShardOwned;
-  private final long[] currentOffset;
-  private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
-  private final Map<UUID, ChatRoomData>[] chatRoomData;
+  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
 
   private boolean running;
-  @Getter
-  private volatile boolean loadInProgress;
 
 
   public InfoChannel(
-    String topic,
-    Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
+    DataChannel dataChannel,
+    String infoTopic,
+    String dataTopic,
+    Producer<String, AbstractDataMessageTo> producer,
+    Consumer<String, AbstractDataMessageTo> consumer,
     ZoneId zoneId,
-    int numShards,
     int bufferSize,
     Clock clock)
   {
     log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
-        topic,
-        numShards);
-    this.topic = topic;
+        "Creating InfoChannel for topic {}",
+        infoTopic);
+    this.dataChannel = dataChannel;
+    this.infoTopic = infoTopic;
+    this.dataTopic = dataTopic;
     this.consumer = consumer;
     this.producer = producer;
     this.zoneId = zoneId;
-    this.numShards = numShards;
     this.bufferSize = bufferSize;
     this.clock = clock;
-    this.isShardOwned = new boolean[numShards];
-    this.currentOffset = new long[numShards];
-    this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
-    this.chatRoomData = new Map[numShards];
-    IntStream
-        .range(0, numShards)
-        .forEach(shard ->
-        {
-          this.chatRoomInfo[shard] = new HashMap<>();
-          this.chatRoomData[shard] = new HashMap<>();
-        });
+    this.chatRoomInfo = new HashMap<>();
   }
 
 
@@ -90,12 +73,12 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+    CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractMessageTo> record =
+      ProducerRecord<String, AbstractDataMessageTo> record =
           new ProducerRecord<>(
-              topic,
+              dataTopic,
               chatRoomId.toString(),
               createChatRoomRequestTo);
 
@@ -131,13 +114,13 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractMessageTo> record =
+      ProducerRecord<String, AbstractDataMessageTo> record =
           new ProducerRecord<>(
-              topic,
+              infoTopic,
               null,
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+              EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
@@ -216,7 +199,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
@@ -249,9 +232,9 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
   {
-    for (ConsumerRecord<String, AbstractMessageTo> record : records)
+    for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
     {
       UUID chatRoomId = UUID.fromString(record.key());
 
@@ -260,7 +243,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
         case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
+              (CommandCreateChatRoomToData) record.value(),
               record.partition());
           break;
 
@@ -271,7 +254,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
               chatRoomId,
               timestamp,
               record.offset(),
-              (EventChatMessageReceivedTo) record.value(),
+              (EventChatDataMessageReceivedTo) record.value(),
               record.partition());
           break;
 
@@ -289,7 +272,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
 
   private void createChatRoom(
       UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
+      CommandCreateChatRoomToData createChatRoomRequestTo,
       Integer partition)
   {
     log.info(
@@ -327,7 +310,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      EventChatMessageReceivedTo chatMessageTo,
+      EventChatDataMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
@@ -353,7 +336,7 @@ public class InfoChannel implements Runnable, ConsumerRebalanceListener
     consumer.pause(IntStream
         .range(0, numShards)
         .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(topic, shard))
+        .mapToObj(shard -> new TopicPartition(infoTopic, shard))
         .toList());
   }
 
index 5019ed2..73990e6 100644 (file)
@@ -18,7 +18,8 @@ import java.util.*;
 public class KafkaChatHomeService implements ChatHomeService
 {
   private final int numPartitions;
-  private final ChatRoomChannel chatRoomChannel;
+  private final InfoChannel infoChannel;
+  private final DataChannel dataChannel;
 
 
 
@@ -26,37 +27,37 @@ public class KafkaChatHomeService implements ChatHomeService
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
     log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+    return infoChannel.sendCreateChatRoomRequest(id, name);
   }
 
   @Override
   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
     int shard = selectShard(id);
-    return chatRoomChannel
+    return infoChannel
         .getChatRoomInfo(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
-            chatRoomChannel.getOwnedShards())));
+            dataChannel.getOwnedShards())));
   }
 
   @Override
   public Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return chatRoomChannel.getChatRoomInfo();
+    return infoChannel.getChatRoomInfo();
   }
 
   @Override
   public Mono<ChatRoomData> getChatRoomData(UUID id)
   {
     int shard = selectShard(id);
-    return chatRoomChannel
+    return dataChannel
         .getChatRoomData(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
-            chatRoomChannel.getOwnedShards())));
+            dataChannel.getOwnedShards())));
   }
 
   int selectShard(UUID chatRoomId)
index df9ee73..8ab50f1 100644 (file)
@@ -15,7 +15,7 @@ import java.util.UUID;
 @Slf4j
 public class KafkaChatMessageService implements ChatMessageService
 {
-  private final ChatRoomChannel chatRoomChannel;
+  private final DataChannel dataChannel;
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
@@ -27,7 +27,7 @@ public class KafkaChatMessageService implements ChatMessageService
     LocalDateTime timestamp,
     String text)
   {
-    return chatRoomChannel
+    return dataChannel
         .sendChatMessage(chatRoomId, key, timestamp, text)
         .doOnSuccess(message -> persistMessage(message));
   }
index 70e41e1..2135a7d 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -34,9 +34,9 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   ConfigurableApplicationContext context;
 
   @Autowired
-  ChatRoomChannel chatRoomChannel;
+  DataChannel dataChannel;
   @Autowired
-  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
+  Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
 
@@ -44,11 +44,11 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    List<String> topics = List.of(properties.getKafka().getChatRoomChannelTopic());
-    chatRoomChannelConsumer.subscribe(topics, chatRoomChannel);
+    List<String> topics = List.of(properties.getKafka().getDataChannelTopic());
+    chatRoomChannelConsumer.subscribe(topics, dataChannel);
     log.info("Starting the consumer for the ChatRoomChannel");
     chatRoomChannelConsumerJob = taskExecutor
-        .submitCompletable(chatRoomChannel)
+        .submitCompletable(dataChannel)
         .exceptionally(e ->
         {
           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
index d63111a..ecd4419 100644 (file)
@@ -2,9 +2,9 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,23 +36,23 @@ public class KafkaServicesConfiguration
   @Bean
   ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
+      DataChannel dataChannel)
   {
     return new KafkaChatHomeService(
         properties.getKafka().getNumPartitions(),
-        chatRoomChannel);
+        dataChannel);
   }
 
   @Bean
-  ChatRoomChannel chatRoomChannel(
+  DataChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      Producer<String, AbstractDataMessageTo> chatRoomChannelProducer,
+      Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatRoomChannelTopic(),
+    return new DataChannel(
+        properties.getKafka().getDataChannelTopic(),
         chatRoomChannelProducer,
         chatRoomChannelConsumer,
         zoneId,
@@ -62,11 +62,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+  Producer<String, AbstractDataMessageTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
-      JsonSerializer<AbstractMessageTo> messageSerializer)
+      JsonSerializer<AbstractDataMessageTo> messageSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -86,9 +86,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
+  JsonSerializer<AbstractDataMessageTo> chatMessageSerializer(String typeMappings)
   {
-    JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
+    JsonSerializer<AbstractDataMessageTo> serializer = new JsonSerializer<>();
     serializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS, typeMappings),
@@ -97,11 +97,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
+  Consumer<String, AbstractDataMessageTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
-      JsonDeserializer<AbstractMessageTo> messageDeserializer)
+      JsonDeserializer<AbstractDataMessageTo> messageDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -124,9 +124,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
+  JsonDeserializer<AbstractDataMessageTo> chatMessageDeserializer(String typeMappings)
   {
-    JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<AbstractDataMessageTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
@@ -139,8 +139,8 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
-        "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
+        "command_create_chatroom:" +  CommandCreateChatRoomToData.class.getCanonicalName() + "," +
+        "event_chatmessage_received:" + EventChatDataMessageReceivedTo.class.getCanonicalName();
   }
 
   @Bean
index 6f61592..c96f43b 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 
 import lombok.Getter;
@@ -6,7 +6,7 @@ import lombok.RequiredArgsConstructor;
 
 
 @RequiredArgsConstructor
-public class AbstractMessageTo
+public class AbstractDataMessageTo
 {
   public enum ToType {
     COMMAND_CREATE_CHATROOM,
index 29ba77c..ee5fe0f 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 import lombok.*;
 
@@ -7,20 +7,20 @@ import lombok.*;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class CommandCreateChatRoomTo extends AbstractMessageTo
+public class CommandCreateChatRoomToData extends AbstractDataMessageTo
 {
   private String name;
 
 
-  public CommandCreateChatRoomTo()
+  public CommandCreateChatRoomToData()
   {
     super(ToType.COMMAND_CREATE_CHATROOM);
   }
 
 
-  public static CommandCreateChatRoomTo of(String name)
+  public static CommandCreateChatRoomToData of(String name)
   {
-    CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+    CommandCreateChatRoomToData to = new CommandCreateChatRoomToData();
     to.name = name;
     return to;
   }
index 17d3a39..20a0f5c 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 import lombok.*;
 
@@ -7,22 +7,22 @@ import lombok.*;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class EventChatMessageReceivedTo extends AbstractMessageTo
+public class EventChatDataMessageReceivedTo extends AbstractDataMessageTo
 {
   private String user;
   private Long id;
   private String text;
 
 
-  public EventChatMessageReceivedTo()
+  public EventChatDataMessageReceivedTo()
   {
     super(ToType.EVENT_CHATMESSAGE_RECEIVED);
   }
 
 
-  public static EventChatMessageReceivedTo of(String user, Long id, String text)
+  public static EventChatDataMessageReceivedTo of(String user, Long id, String text)
   {
-    EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+    EventChatDataMessageReceivedTo to = new EventChatDataMessageReceivedTo();
     to.user = user;
     to.id = id;
     to.text = text;
index 0cf4232..c2311de 100644 (file)
@@ -1,2 +1,24 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated {
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatRoomCreated extends AbstractInfoMessageTo
+{
+  private String id;
+  private String name;
+  private Integer shard;
+
+
+  public EventChatRoomCreated(ToType type)
+  {
+    super(type);
+  }
 }
index 8ea9cc2..853ee1c 100644 (file)
@@ -11,20 +11,18 @@ import org.springframework.data.mongodb.core.mapping.Document;
 @Getter(AccessLevel.PACKAGE)
 @Setter(AccessLevel.PACKAGE)
 @EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
+@ToString(of = { "id", "name" })
 @Document
 public class ChatRoomTo
 {
   @Id
   private String id;
-  private Integer shard;
   private String name;
 
   public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
         chatRoomInfo.getId().toString(),
-        chatRoomInfo.getShard(),
         chatRoomInfo.getName());
   }
 }
index f257d5e..630dc63 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend;
 
-import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel;
+import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -19,7 +19,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
 
 
 @SpringBootTest(
@@ -29,14 +30,18 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
         "chat.backend.kafka.client-id-PREFIX=TEST",
         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+        "chat.backend.kafka.info-topic=" + INFO_TOPIC,
+        "chat.backend.kafka.data-topic=" + DATA_TOPIC,
         "chat.backend.kafka.num-partitions=10",
         })
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+    topics = { INFO_TOPIC, DATA_TOPIC },
+    partitions = 10)
 @Slf4j
 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 {
-  final static String TOPIC = "KAFKA_CONFIGURATION_IT";
+  final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
+  final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
 
   static CompletableFuture<Void> CONSUMER_JOB;
 
@@ -48,19 +53,19 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
       @Autowired KafkaTemplate<String, String> messageTemplate,
       @Autowired Consumer chatRoomChannelConsumer,
       @Autowired ThreadPoolTaskExecutor taskExecutor,
-      @Autowired ChatRoomChannel chatRoomChannel)
+      @Autowired DataChannel dataChannel)
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "command_create_chatroom");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(INFO_TOPIC, 2));
     chatRoomChannelConsumer.assign(assignedPartitions);
-    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    dataChannel.onPartitionsAssigned(assignedPartitions);
     CONSUMER_JOB = taskExecutor
-        .submitCompletable(chatRoomChannel)
+        .submitCompletable(dataChannel)
         .exceptionally(e ->
         {
           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
@@ -70,7 +75,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 
   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(INFO_TOPIC, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(
index c120e46..a07ffd1 100644 (file)
@@ -69,7 +69,7 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
       @Autowired KafkaTemplate<String, String> messageTemplate,
       @Autowired Consumer chatRoomChannelConsumer,
       @Autowired ThreadPoolTaskExecutor taskExecutor,
-      @Autowired ChatRoomChannel chatRoomChannel)
+      @Autowired DataChannel dataChannel)
   {
     send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
@@ -79,9 +79,9 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
     List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
     chatRoomChannelConsumer.assign(assignedPartitions);
-    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    dataChannel.onPartitionsAssigned(assignedPartitions);
     CONSUMER_JOB = taskExecutor
-        .submitCompletable(chatRoomChannel)
+        .submitCompletable(dataChannel)
         .exceptionally(e ->
         {
           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
index b031112..6499575 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
@@ -29,7 +29,7 @@ public class CommandCreateChatRoomToTest
   @Test
   public void testDeserialization() throws Exception
   {
-    CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+    CommandCreateChatRoomToData message = mapper.readValue(json, CommandCreateChatRoomToData.class);
     assertThat(message.getName()).isEqualTo("Foo-Room!");
   }
 }
index d9d5a08..41d7bc1 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
@@ -31,7 +31,7 @@ public class EventChatMessageReceivedToTest
   @Test
   public void testDeserialization() throws Exception
   {
-    EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+    EventChatDataMessageReceivedTo message = mapper.readValue(json, EventChatDataMessageReceivedTo.class);
     assertThat(message.getId()).isEqualTo(1l);
     assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
     assertThat(message.getUser()).isEqualTo("Peter");