WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Mon, 11 Sep 2023 17:00:51 +0000 (19:00 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 20:43:44 +0000 (22:43 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/CommandCreateChatRoomToTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java

index c1067bc..241ea9b 100644 (file)
@@ -4,8 +4,8 @@ import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -81,7 +81,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
+    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractDataMessageTo> record =
@@ -128,7 +128,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
               null,
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
+              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
@@ -251,7 +251,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CommandCreateChatRoomToData) record.value(),
+              (CommandCreateChatRoomTo) record.value(),
               record.partition());
           break;
 
@@ -262,7 +262,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
               chatRoomId,
               timestamp,
               record.offset(),
-              (EventChatDataMessageReceivedTo) record.value(),
+              (EventChatMessageReceivedTo) record.value(),
               record.partition());
           break;
 
@@ -280,7 +280,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
   private void createChatRoom(
       UUID chatRoomId,
-      CommandCreateChatRoomToData createChatRoomRequestTo,
+      CommandCreateChatRoomTo createChatRoomRequestTo,
       Integer partition)
   {
     log.info(
@@ -318,7 +318,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      EventChatDataMessageReceivedTo chatMessageTo,
+      EventChatMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
index 2823d9d..2edf1f0 100644 (file)
@@ -6,8 +6,8 @@ import de.juplo.kafka.chat.backend.domain.Message;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -30,40 +30,29 @@ import java.util.stream.IntStream;
 @Slf4j
 public class InfoChannel implements Runnable
 {
-  private final DataChannel dataChannel;
   private final String infoTopic;
   private final String dataTopic;
   private final Producer<String, AbstractDataMessageTo> producer;
   private final Consumer<String, AbstractDataMessageTo> consumer;
-  private final ZoneId zoneId;
-  private final int bufferSize;
-  private final Clock clock;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
 
   private boolean running;
 
 
   public InfoChannel(
-    DataChannel dataChannel,
     String infoTopic,
     String dataTopic,
     Producer<String, AbstractDataMessageTo> producer,
     Consumer<String, AbstractDataMessageTo> consumer,
-    ZoneId zoneId,
-    int bufferSize,
-    Clock clock)
+    ZoneId zoneId)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
         infoTopic);
-    this.dataChannel = dataChannel;
     this.infoTopic = infoTopic;
     this.dataTopic = dataTopic;
     this.consumer = consumer;
     this.producer = producer;
-    this.zoneId = zoneId;
-    this.bufferSize = bufferSize;
-    this.clock = clock;
     this.chatRoomInfo = new HashMap<>();
   }
 
@@ -73,20 +62,20 @@ public class InfoChannel implements Runnable
       UUID chatRoomId,
       String name)
   {
-    CommandCreateChatRoomToData createChatRoomRequestTo = CommandCreateChatRoomToData.of(name);
+    CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractDataMessageTo> record =
           new ProducerRecord<>(
               dataTopic,
               chatRoomId.toString(),
-              createChatRoomRequestTo);
+              to);
 
       producer.send(record, ((metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          log.info("Successfully send chreate-request for chat room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
           createChatRoom(chatRoomInfo);
           sink.success(chatRoomInfo);
@@ -105,90 +94,6 @@ public class InfoChannel implements Runnable
     });
   }
 
-  Mono<Message> sendChatMessage(
-      UUID chatRoomId,
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractDataMessageTo> record =
-          new ProducerRecord<>(
-              infoTopic,
-              null,
-              zdt.toEpochSecond(),
-              chatRoomId.toString(),
-              EventChatDataMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = new Message(key, metadata.offset(), timestamp, text);
-          log.info("Successfully send message {}", message);
-          sink.success(message);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
-              chatRoomId,
-              key,
-              timestamp,
-              text,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
-
-    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] =  true;
-      this.currentOffset[partition] = currentOffset;
-
-      log.info(
-          "Partition assigned: {} - loading messages: next={} -> current={}",
-          partition,
-          nextOffset[partition],
-          currentOffset);
-
-      consumer.seek(topicPartition, nextOffset[partition]);
-    });
-
-    consumer.resume(partitions);
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(topicPartition ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] = false;
-      log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
-    });
-  }
-
-  @Override
-  public void onPartitionsLost(Collection<TopicPartition> partitions)
-  {
-    log.warn("Lost partitions: {}, partitions");
-    // TODO: Muss auf den Verlust anders reagiert werden?
-    onPartitionsRevoked(partitions);
-  }
 
   @Override
   public void run()
@@ -232,9 +137,9 @@ public class InfoChannel implements Runnable
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractDataMessageTo> records)
+  private void loadChatRoom(ConsumerRecords<String, AbstractInfoMessageTo> records)
   {
-    for (ConsumerRecord<String, AbstractDataMessageTo> record : records)
+    for (ConsumerRecord<String, AbstractInfoMessageTo> record : records)
     {
       UUID chatRoomId = UUID.fromString(record.key());
 
@@ -243,7 +148,7 @@ public class InfoChannel implements Runnable
         case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CommandCreateChatRoomToData) record.value(),
+              (CommandCreateChatRoomTo) record.value(),
               record.partition());
           break;
 
@@ -254,7 +159,7 @@ public class InfoChannel implements Runnable
               chatRoomId,
               timestamp,
               record.offset(),
-              (EventChatDataMessageReceivedTo) record.value(),
+              (EventChatMessageReceivedTo) record.value(),
               record.partition());
           break;
 
@@ -272,7 +177,7 @@ public class InfoChannel implements Runnable
 
   private void createChatRoom(
       UUID chatRoomId,
-      CommandCreateChatRoomToData createChatRoomRequestTo,
+      CommandCreateChatRoomTo createChatRoomRequestTo,
       Integer partition)
   {
     log.info(
@@ -310,7 +215,7 @@ public class InfoChannel implements Runnable
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      EventChatDataMessageReceivedTo chatMessageTo,
+      EventChatMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
index 96f5b33..5fbaa7b 100644 (file)
@@ -3,8 +3,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.AbstractDataMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomToData;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatDataMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -168,8 +168,8 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "command_create_chatroom:" +  CommandCreateChatRoomToData.class.getCanonicalName() + "," +
-        "event_chatmessage_received:" + EventChatDataMessageReceivedTo.class.getCanonicalName();
+        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }
 
   @Bean
index ee5fe0f..00c8b94 100644 (file)
@@ -7,20 +7,20 @@ import lombok.*;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class CommandCreateChatRoomToData extends AbstractDataMessageTo
+public class CommandCreateChatRoomTo extends AbstractDataMessageTo
 {
   private String name;
 
 
-  public CommandCreateChatRoomToData()
+  public CommandCreateChatRoomTo()
   {
     super(ToType.COMMAND_CREATE_CHATROOM);
   }
 
 
-  public static CommandCreateChatRoomToData of(String name)
+  public static CommandCreateChatRoomTo of(String name)
   {
-    CommandCreateChatRoomToData to = new CommandCreateChatRoomToData();
+    CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
     to.name = name;
     return to;
   }
index 20a0f5c..9709011 100644 (file)
@@ -7,22 +7,22 @@ import lombok.*;
 @Setter
 @EqualsAndHashCode
 @ToString
-public class EventChatDataMessageReceivedTo extends AbstractDataMessageTo
+public class EventChatMessageReceivedTo extends AbstractDataMessageTo
 {
   private String user;
   private Long id;
   private String text;
 
 
-  public EventChatDataMessageReceivedTo()
+  public EventChatMessageReceivedTo()
   {
     super(ToType.EVENT_CHATMESSAGE_RECEIVED);
   }
 
 
-  public static EventChatDataMessageReceivedTo of(String user, Long id, String text)
+  public static EventChatMessageReceivedTo of(String user, Long id, String text)
   {
-    EventChatDataMessageReceivedTo to = new EventChatDataMessageReceivedTo();
+    EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
     to.user = user;
     to.id = id;
     to.text = text;
index c2311de..27df2aa 100644 (file)
@@ -17,8 +17,8 @@ public class EventChatRoomCreated extends AbstractInfoMessageTo
   private Integer shard;
 
 
-  public EventChatRoomCreated(ToType type)
+  public EventChatRoomCreated()
   {
-    super(type);
+    super(ToType.EVENT_CHATROOM_CREATED);
   }
 }
index 6499575..044bf1d 100644 (file)
@@ -29,7 +29,7 @@ public class CommandCreateChatRoomToTest
   @Test
   public void testDeserialization() throws Exception
   {
-    CommandCreateChatRoomToData message = mapper.readValue(json, CommandCreateChatRoomToData.class);
+    CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
     assertThat(message.getName()).isEqualTo("Foo-Room!");
   }
 }
index 41d7bc1..72ade06 100644 (file)
@@ -31,7 +31,7 @@ public class EventChatMessageReceivedToTest
   @Test
   public void testDeserialization() throws Exception
   {
-    EventChatDataMessageReceivedTo message = mapper.readValue(json, EventChatDataMessageReceivedTo.class);
+    EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
     assertThat(message.getId()).isEqualTo(1l);
     assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
     assertThat(message.getUser()).isEqualTo("Peter");