WIP
authorKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:15:02 +0000 (10:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 20 Aug 2023 08:48:28 +0000 (10:48 +0200)
15 files changed:
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java
deleted file mode 100644 (file)
index 85a194e..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-public class AbstractMessageTo
-{
-  public enum ToType {
-    CREATE_CHATROOM_COMMAND,
-    CHATMESSAGE_EVENT,
-  }
-
-  @Getter
-  private final ToType type;
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java
deleted file mode 100644 (file)
index 2bc7a03..0000000
+++ /dev/null
@@ -1,48 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.*;
-
-import java.time.LocalDateTime;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class ChatMessageReceivedEventTo extends AbstractMessageTo
-{
-  private String user;
-  private Long id;
-  private String text;
-
-
-  public ChatMessageReceivedEventTo()
-  {
-    super(ToType.CHATMESSAGE_EVENT);
-  }
-
-
-  public Message toMessage(long offset, LocalDateTime timestamp)
-  {
-    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
-  }
-
-  public static ChatMessageReceivedEventTo from(Message message)
-  {
-    return ChatMessageReceivedEventTo.of(
-        message.getUsername(),
-        message.getId(),
-        message.getMessageText());
-  }
-
-
-  public static ChatMessageReceivedEventTo of(String user, Long id, String text)
-  {
-    ChatMessageReceivedEventTo to = new ChatMessageReceivedEventTo();
-    to.user = user;
-    to.id = id;
-    to.text = text;
-    return to;
-  }
-}
index 275224d..c8bc41b 100644 (file)
@@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -76,7 +79,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       String name)
   {
-    CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name);
+    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractMessageTo> record =
@@ -123,7 +126,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
               null,
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text));
+              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
@@ -245,21 +248,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
       switch (record.value().getType())
       {
-        case CREATE_CHATROOM_COMMAND:
+        case COMMAND_CREATE_CHATROOM:
           createChatRoom(
               chatRoomId,
-              (CreateChatRoomCommandTo) record.value(),
+              (CommandCreateChatRoomTo) record.value(),
               record.partition());
           break;
 
-        case CHATMESSAGE_EVENT:
+        case EVENT_CHATMESSAGE_RECEIVED:
           Instant instant = Instant.ofEpochSecond(record.timestamp());
           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
           loadChatMessage(
               chatRoomId,
               timestamp,
               record.offset(),
-              (ChatMessageReceivedEventTo) record.value(),
+              (EventChatMessageReceivedTo) record.value(),
               record.partition());
           break;
 
@@ -277,7 +280,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
   void createChatRoom(
       UUID chatRoomId,
-      CreateChatRoomCommandTo createChatRoomRequestTo,
+      CommandCreateChatRoomTo createChatRoomRequestTo,
       int partition)
   {
     log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize);
@@ -308,7 +311,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
       UUID chatRoomId,
       LocalDateTime timestamp,
       long offset,
-      ChatMessageReceivedEventTo chatMessageTo,
+      EventChatMessageReceivedTo chatMessageTo,
       int partition)
   {
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java
deleted file mode 100644 (file)
index b2601fb..0000000
+++ /dev/null
@@ -1,27 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import lombok.*;
-
-
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class CreateChatRoomCommandTo extends AbstractMessageTo
-{
-  private String name;
-
-
-  public CreateChatRoomCommandTo()
-  {
-    super(ToType.CREATE_CHATROOM_COMMAND);
-  }
-
-
-  public static CreateChatRoomCommandTo of(String name)
-  {
-    CreateChatRoomCommandTo to = new CreateChatRoomCommandTo();
-    to.name = name;
-    return to;
-  }
-}
index 26887a9..324e80b 100644 (file)
@@ -16,26 +16,26 @@ import java.util.*;
 public class KafkaChatHome implements ChatHome
 {
   private final KafkaLikeShardingStrategy shardingStrategy;
-  private final ChatRoomChannel chatMessageChanel;
+  private final ChatRoomChannel chatRoomChannel;
 
 
   @Override
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     int shard = shardingStrategy.selectShard(id);
-    if (chatMessageChanel.isLoadInProgress())
+    if (chatRoomChannel.isLoadInProgress())
     {
       throw new LoadInProgressException(shard);
     }
     else
     {
-      return chatMessageChanel.getChatRoom(shard, id);
+      return chatRoomChannel.getChatRoom(shard, id);
     }
   }
 
   @Override
   public Flux<ChatRoom> getChatRooms()
   {
-      return chatMessageChanel.getChatRooms();
+      return chatRoomChannel.getChatRooms();
   }
 }
index 825f16e..6a1dc78 100644 (file)
@@ -18,7 +18,7 @@ public class KafkaChatRoomFactory implements ChatRoomFactory
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Sending create-request for chat rooom: id={}, name={}");
+    log.info("Sending create-command for chat rooom: id={}, name={}");
     return chatRoomChannel.sendCreateChatRoomRequest(id, name);
   }
 }
index fec48b0..5c7e0d8 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.persistence.kafka;
 
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
index e8c3f0d..dda8748 100644 (file)
@@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo;
+import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -80,7 +83,7 @@ public class KafkaServicesConfiguration
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
     return new KafkaProducer<>(
         properties,
         stringSerializer,
@@ -115,7 +118,7 @@ public class KafkaServicesConfiguration
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
         "chat_message_channel");
@@ -147,8 +150,8 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "create:" +  CreateChatRoomCommandTo.class.getCanonicalName() + "," +
-        "message:" + ChatMessageReceivedEventTo.class.getCanonicalName();
+        "create:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "message:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java
new file mode 100644 (file)
index 0000000..7cc7541
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractMessageTo
+{
+  public enum ToType {
+    COMMAND_CREATE_CHATROOM,
+    EVENT_CHATMESSAGE_RECEIVED,
+  }
+
+  @Getter
+  private final ToType type;
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java
new file mode 100644 (file)
index 0000000..1a134f3
--- /dev/null
@@ -0,0 +1,27 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import lombok.*;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class CommandCreateChatRoomTo extends AbstractMessageTo
+{
+  private String name;
+
+
+  public CommandCreateChatRoomTo()
+  {
+    super(ToType.COMMAND_CREATE_CHATROOM);
+  }
+
+
+  public static CommandCreateChatRoomTo of(String name)
+  {
+    CommandCreateChatRoomTo to = new CommandCreateChatRoomTo();
+    to.name = name;
+    return to;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java
new file mode 100644 (file)
index 0000000..2297b94
--- /dev/null
@@ -0,0 +1,48 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.*;
+
+import java.time.LocalDateTime;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatMessageReceivedTo extends AbstractMessageTo
+{
+  private String user;
+  private Long id;
+  private String text;
+
+
+  public EventChatMessageReceivedTo()
+  {
+    super(ToType.EVENT_CHATMESSAGE_RECEIVED);
+  }
+
+
+  public Message toMessage(long offset, LocalDateTime timestamp)
+  {
+    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+  }
+
+  public static EventChatMessageReceivedTo from(Message message)
+  {
+    return EventChatMessageReceivedTo.of(
+        message.getUsername(),
+        message.getId(),
+        message.getMessageText());
+  }
+
+
+  public static EventChatMessageReceivedTo of(String user, Long id, String text)
+  {
+    EventChatMessageReceivedTo to = new EventChatMessageReceivedTo();
+    to.user = user;
+    to.id = id;
+    to.text = text;
+    return to;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java
deleted file mode 100644 (file)
index be612c8..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class ChatMessageReceivedEventToTest
-{
-  final String json = """
-  {
-    "id": 1,
-    "text": "Hallo, ich heiße Peter!",
-    "user": "Peter"
-  }""";
-
-  ObjectMapper mapper;
-
-  @BeforeEach
-  public void setUp()
-  {
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-
-  @Test
-  public void testDeserialization() throws Exception
-  {
-    ChatMessageReceivedEventTo message = mapper.readValue(json, ChatMessageReceivedEventTo.class);
-    assertThat(message.getId()).isEqualTo(1l);
-    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
-    assertThat(message.getUser()).isEqualTo("Peter");
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java
deleted file mode 100644 (file)
index 71a2971..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class CreateChatRoomCommandToTest
-{
-  final String json = """
-  {
-    "name": "Foo-Room!"
-  }""";
-
-  ObjectMapper mapper;
-
-  @BeforeEach
-  public void setUp()
-  {
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-
-  @Test
-  public void testDeserialization() throws Exception
-  {
-    CreateChatRoomCommandTo message = mapper.readValue(json, CreateChatRoomCommandTo.class);
-    assertThat(message.getName()).isEqualTo("Foo-Room!");
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java
new file mode 100644 (file)
index 0000000..5ef12ef
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CommandCreateChatRoomToTest
+{
+  final String json = """
+  {
+    "name": "Foo-Room!"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class);
+    assertThat(message.getName()).isEqualTo("Foo-Room!");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java
new file mode 100644 (file)
index 0000000..33a7827
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka.messages;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class EventChatMessageReceivedToTest
+{
+  final String json = """
+  {
+    "id": 1,
+    "text": "Hallo, ich heiße Peter!",
+    "user": "Peter"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class);
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+    assertThat(message.getUser()).isEqualTo("Peter");
+  }
+}