WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 11 Sep 2023 17:07:00 +0000 (19:07 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:35:20 +0000 (23:35 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java

index 241ea9b..738901a 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
@@ -27,8 +27,8 @@ import java.util.stream.IntStream;
 public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
-  private final Producer<String, AbstractDataMessageTo> producer;
-  private final Consumer<String, AbstractDataMessageTo> consumer;
+  private final Producer<String, AbstractMessageTo> producer;
+  private final Consumer<String, AbstractMessageTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
   private final int bufferSize;
@@ -45,8 +45,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   public DataChannel(
     String topic,
-    Producer<String, AbstractDataMessageTo> producer,
-    Consumer<String, AbstractDataMessageTo> consumer,
+    Producer<String, AbstractMessageTo> producer,
+    Consumer<String, AbstractMessageTo> dataChannelConsumer,
     ZoneId zoneId,
     int numShards,
     int bufferSize,
@@ -57,7 +57,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         topic,
         numShards);
     this.topic = topic;
-    this.consumer = consumer;
+    this.consumer = dataChannelConsumer;
     this.producer = producer;
     this.zoneId = zoneId;
     this.numShards = numShards;
@@ -84,7 +84,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractDataMessageTo> record =
+      ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
               topic,
               chatRoomId.toString(),
@@ -122,7 +122,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractDataMessageTo> record =
+      ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
               topic,
               null,
@@ -207,7 +207,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
@@ -240,9 +240,9 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
   {
-    for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
+    for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
       UUID chatRoomId = UUID.fromString(record.key());
 
index 2edf1f0..77f0cbb 100644 (file)
@@ -5,7 +5,7 @@ import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.extern.slf4j.Slf4j;
@@ -20,7 +20,6 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -32,8 +31,8 @@ public class InfoChannel implements Runnable
 {
   private final String infoTopic;
   private final String dataTopic;
-  private final Producer<String, AbstractDataMessageTo> producer;
-  private final Consumer<String, AbstractDataMessageTo> consumer;
+  private final Producer<String, AbstractMessageTo> producer;
+  private final Consumer<String, AbstractMessageTo> consumer;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
 
   private boolean running;
@@ -42,16 +41,15 @@ public class InfoChannel implements Runnable
   public InfoChannel(
     String infoTopic,
     String dataTopic,
-    Producer<String, AbstractDataMessageTo> producer,
-    Consumer<String, AbstractDataMessageTo> consumer,
-    ZoneId zoneId)
+    Producer<String, AbstractMessageTo> producer,
+    Consumer<String, AbstractMessageTo> infoChannelConsumer)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
         infoTopic);
     this.infoTopic = infoTopic;
     this.dataTopic = dataTopic;
-    this.consumer = consumer;
+    this.consumer = infoChannelConsumer;
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
   }
@@ -65,7 +63,7 @@ public class InfoChannel implements Runnable
     CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, AbstractDataMessageTo> record =
+      ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
               dataTopic,
               chatRoomId.toString(),
@@ -104,7 +102,7 @@ public class InfoChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<String, AbstractDataMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
index 2135a7d..97cfa69 100644 (file)
@@ -1,7 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -36,7 +36,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   DataChannel dataChannel;
   @Autowired
-  Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer;
+  Consumer<String, AbstractMessageTo> chatRoomChannelConsumer;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
 
index c7d5115..0aeefea 100644 (file)
@@ -2,7 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -46,8 +46,8 @@ public class KafkaServicesConfiguration
   @Bean
   DataChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractDataMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractDataMessageTo> chatRoomChannelConsumer,
+      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
+      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
@@ -62,11 +62,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractDataMessageTo>  chatRoomChannelProducer(
+  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
-      JsonSerializer<AbstractDataMessageTo> messageSerializer)
+      JsonSerializer<AbstractMessageTo> messageSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -86,9 +86,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<AbstractDataMessageTo> chatMessageSerializer(String typeMappings)
+  JsonSerializer<AbstractMessageTo> chatMessageSerializer(String typeMappings)
   {
-    JsonSerializer<AbstractDataMessageTo> serializer = new JsonSerializer<>();
+    JsonSerializer<AbstractMessageTo> serializer = new JsonSerializer<>();
     serializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS, typeMappings),
@@ -97,11 +97,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractDataMessageTo>  chatRoomChannelConsumer(
+  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
-      JsonDeserializer<AbstractDataMessageTo> messageDeserializer)
+      JsonDeserializer<AbstractMessageTo> messageDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -124,9 +124,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<AbstractDataMessageTo> chatMessageDeserializer(String typeMappings)
+  JsonDeserializer<AbstractMessageTo> chatMessageDeserializer(String typeMappings)
   {
-    JsonDeserializer<AbstractDataMessageTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<AbstractMessageTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName(),
index c96f43b..1aef3fd 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages;
 
 
 import lombok.Getter;
@@ -6,11 +6,12 @@ import lombok.RequiredArgsConstructor;
 
 
 @RequiredArgsConstructor
-public class AbstractDataMessageTo
+public class AbstractMessageTo
 {
   public enum ToType {
     COMMAND_CREATE_CHATROOM,
     EVENT_CHATMESSAGE_RECEIVED,
+    EVENT_CHATROOM_CREATED,
   }
 
   @Getter
index 00c8b94..9c8bd75 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.*;
 
 
@@ -7,7 +8,7 @@ import lombok.*;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class CommandCreateChatRoomTo extends AbstractDataMessageTo
+public class CommandCreateChatRoomTo extends AbstractMessageTo
 {
   private String name;
 
index 9709011..d4f6508 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.*;
 
 
@@ -7,7 +8,7 @@ import lombok.*;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class EventChatMessageReceivedTo extends AbstractDataMessageTo
+public class EventChatMessageReceivedTo extends AbstractMessageTo
 {
   private String user;
   private Long id;
index 27df2aa..75907e1 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
 
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
@@ -10,7 +11,7 @@ import lombok.ToString;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class EventChatRoomCreated extends AbstractInfoMessageTo
+public class EventChatRoomCreated extends AbstractMessageTo
 {
   private String id;
   private String name;