NG
authorKai Moritz <kai@juplo.de>
Sat, 22 Apr 2023 09:29:39 +0000 (11:29 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Apr 2023 09:29:39 +0000 (11:29 +0200)
13 files changed:
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java [deleted file]
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java
new file mode 100644 (file)
index 0000000..216ff2e
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+
+@RequiredArgsConstructor
+public class AbstractTo
+{
+  public enum ToType { MESSAGE_SENT, CREATE_CHATROOM_REQUEST };
+
+  @Getter
+  private final ToType type;
+}
index 138d9a7..8294316 100644 (file)
@@ -25,8 +25,8 @@ import java.util.stream.IntStream;
 public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
-  private final Producer<String, MessageTo> producer;
-  private final Consumer<String, MessageTo> consumer;
+  private final Producer<String, AbstractTo> producer;
+  private final Consumer<String, AbstractTo> consumer;
   private final ZoneId zoneId;
   private final int numShards;
   private final boolean[] isShardOwned;
@@ -42,8 +42,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
 
   public ChatMessageChannel(
     String topic,
-    Producer<String, MessageTo> producer,
-    Consumer<String, MessageTo> consumer,
+    Producer<String, AbstractTo> producer,
+    Consumer<String, AbstractTo> consumer,
     ZoneId zoneId,
     int numShards)
   {
@@ -78,13 +78,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
     return Mono.create(sink ->
     {
-      ProducerRecord<String, MessageTo> record =
+      ProducerRecord<String, ChatMessageTo> record =
           new ProducerRecord<>(
               tp.topic(),
               tp.partition(),
               zdt.toEpochSecond(),
               chatRoomId.toString(),
-              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+              ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
 
       producer.send(record, ((metadata, exception) ->
       {
@@ -165,12 +165,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
         {
-          loadMessages(records);
+          loadChatRoom(records);
 
           if (isLoadingCompleted())
           {
@@ -198,31 +198,55 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  void loadMessages(ConsumerRecords<String, MessageTo> records)
+  void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
   {
-    for (ConsumerRecord<String, MessageTo> record : records)
+    for (ConsumerRecord<String, AbstractTo> record : records)
     {
-      nextOffset[record.partition()] = record.offset() + 1;
-      UUID chatRoomId = UUID.fromString(record.key());
-      MessageTo messageTo = record.value();
+      switch (record.value().getType())
+      {
+        case CREATE_CHATROOM_REQUEST:
+          createChatRoom((CreateChatRoomRequestTo) record.value());
+          break;
+
+        case MESSAGE_SENT:
+          UUID chatRoomId = UUID.fromString(record.key());
+          Instant instant = Instant.ofEpochSecond(record.timestamp());
+          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+          loadChatMessage(
+              chatRoomId,
+              timestamp,
+              record.offset(),
+              (ChatMessageTo) record.value(),
+              record.partition());
+          break;
+      }
 
-      Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
+      nextOffset[record.partition()] = record.offset() + 1;
+    }
+  }
 
-      Instant instant = Instant.ofEpochSecond(record.timestamp());
-      LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+  void createChatRoom(
+      CreateChatRoomRequestTo createChatRoomRequestTo,
+      int partition)
+  {
+    chatrooms[partition].put
+  }
 
-      Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
+  void loadChatMessage(
+      UUID chatRoomId,
+      LocalDateTime timestamp,
+      long offset,
+      ChatMessageTo chatMessageTo,
+      int partition)
+  {
+    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-      ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
-      if (chatRoom == null)
-      {
-        // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen!
-      }
-      KafkaChatRoomService kafkaChatRoomService =
-          (KafkaChatRoomService) chatRoom.getChatRoomService();
+    ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+    KafkaChatRoomService kafkaChatRoomService =
+        (KafkaChatRoomService) chatRoom.getChatRoomService();
 
-      kafkaChatRoomService.persistMessage(message);
-    }
+    kafkaChatRoomService.persistMessage(message);
   }
 
   boolean isLoadingCompleted()
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java
new file mode 100644 (file)
index 0000000..41ce00a
--- /dev/null
@@ -0,0 +1,45 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class ChatMessageTo extends AbstractTo
+{
+  private String user;
+  private Long id;
+  private String text;
+
+
+  public ChatMessageTo()
+  {
+    super(ToType.MESSAGE_SENT);
+  }
+
+
+  public Message toMessage(long offset, LocalDateTime timestamp)
+  {
+    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+  }
+
+  public static ChatMessageTo from(Message message)
+  {
+    return ChatMessageTo.of(
+        message.getUsername(),
+        message.getId(),
+        message.getMessageText());
+  }
+
+
+  public static ChatMessageTo of(String user, Long id, String text)
+  {
+    ChatMessageTo to = new ChatMessageTo();
+    to.user = user;
+    to.id = id;
+    to.text = text;
+    return to;
+  }
+}
index 97ee988..57deaf2 100644 (file)
@@ -22,8 +22,8 @@ import java.util.UUID;
 public class ChatRoomChannel implements Runnable
 {
   private final String topic;
-  private final Producer<Integer, ChatRoomTo> producer;
-  private final Consumer<Integer, ChatRoomTo> consumer;
+  private final Producer<Integer, CreateChatRoomRequestTo> producer;
+  private final Consumer<Integer, CreateChatRoomRequestTo> consumer;
   private final ShardingStrategy shardingStrategy;
   private final ChatMessageChannel chatMessageChannel;
   private final Clock clock;
@@ -37,21 +37,21 @@ public class ChatRoomChannel implements Runnable
       String name)
   {
     int shard = this.shardingStrategy.selectShard(chatRoomId);
-    ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard);
+    CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(chatRoomId.toString(), name, shard);
     return Mono.create(sink ->
     {
-      ProducerRecord<Integer, ChatRoomTo> record =
+      ProducerRecord<Integer, CreateChatRoomRequestTo> record =
           new ProducerRecord<>(
               topic,
               shard,
-              chatRoomTo);
+              createChatRoomRequestTo);
 
       producer.send(record, ((metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.info("Successfully send chreate-request for chat room: {}", chatRoomTo);
-          sink.success(chatRoomTo.toChatRoomInfo());
+          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          sink.success(createChatRoomRequestTo.toChatRoomInfo());
         }
         else
         {
@@ -78,10 +78,10 @@ public class ChatRoomChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<Integer, ChatRoomTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<Integer, CreateChatRoomRequestTo> records = consumer.poll(Duration.ofMinutes(5));
         log.info("Fetched {} messages", records.count());
 
-        for (ConsumerRecord<Integer, ChatRoomTo> record : records)
+        for (ConsumerRecord<Integer, CreateChatRoomRequestTo> record : records)
         {
           createChatRoom(record.value().toChatRoomInfo());
         }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java
deleted file mode 100644 (file)
index e564981..0000000
+++ /dev/null
@@ -1,30 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.UUID;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class ChatRoomTo
-{
-  private String id;
-  private String name;
-  private int shard;
-
-  public ChatRoomInfo toChatRoomInfo()
-  {
-    return new ChatRoomInfo(UUID.fromString(id), name, shard);
-  }
-
-  public static ChatRoomTo from(ChatRoom chatRoom)
-  {
-    return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java
new file mode 100644 (file)
index 0000000..eb57392
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import lombok.Data;
+
+import java.util.UUID;
+
+
+@Data
+public class CreateChatRoomRequestTo extends AbstractTo
+{
+  private String id;
+  private String name;
+  private int shard;
+
+
+  public CreateChatRoomRequestTo()
+  {
+    super(ToType.CREATE_CHATROOM_REQUEST);
+  }
+
+
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(UUID.fromString(id), name, shard);
+  }
+
+  public static CreateChatRoomRequestTo from(ChatRoom chatRoom)
+  {
+    return CreateChatRoomRequestTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard());
+  }
+
+  public static CreateChatRoomRequestTo of(String id,  String name, int shard)
+  {
+    CreateChatRoomRequestTo to = new CreateChatRoomRequestTo();
+    to.id = id;
+    to.name = name;
+    to.shard = shard;
+    return to;
+  }
+}
index ee5834e..8a9e32e 100644 (file)
@@ -30,11 +30,11 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   @Autowired
   ChatRoomChannel chatRoomChannel;
   @Autowired
-  Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer;
+  Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer;
   @Autowired
   ChatMessageChannel chatMessageChannel;
   @Autowired
-  Consumer<String, MessageTo> chatMessageChannelConsumer;
+  Consumer<String, ChatMessageTo> chatMessageChannelConsumer;
 
   CompletableFuture<Void> chatRoomChannelConsumerJob;
   CompletableFuture<Void> chatMessageChannelConsumerJob;
index 4350779..9e1f75e 100644 (file)
@@ -51,8 +51,8 @@ public class KafkaServicesConfiguration
   @Bean
   ChatRoomChannel chatRoomChannel(
       ChatBackendProperties properties,
-      Producer<Integer, ChatRoomTo> chatRoomChannelProducer,
-      Consumer<Integer, ChatRoomTo> chatRoomChannelConsumer,
+      Producer<Integer, CreateChatRoomRequestTo> chatRoomChannelProducer,
+      Consumer<Integer, CreateChatRoomRequestTo> chatRoomChannelConsumer,
       ShardingStrategy shardingStrategy,
       ChatMessageChannel chatMessageChannel,
       Clock clock)
@@ -68,11 +68,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<Integer, ChatRoomTo>  chatRoomChannelProducer(
+  Producer<Integer, CreateChatRoomRequestTo>  chatRoomChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       IntegerSerializer integerSerializer,
-      JsonSerializer<ChatRoomTo> chatRoomSerializer)
+      JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -92,9 +92,9 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<ChatRoomTo> chatRoomSerializer()
+  JsonSerializer<CreateChatRoomRequestTo> chatRoomSerializer()
   {
-    JsonSerializer<ChatRoomTo> serializer = new JsonSerializer<>();
+    JsonSerializer<CreateChatRoomRequestTo> serializer = new JsonSerializer<>();
     serializer.configure(
         Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
         false);
@@ -102,11 +102,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<Integer, ChatRoomTo>  chatRoomChannelConsumer(
+  Consumer<Integer, CreateChatRoomRequestTo>  chatRoomChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       IntegerDeserializer integerDeserializer,
-      JsonDeserializer<ChatRoomTo> chatRoomDeserializer)
+      JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -129,13 +129,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<ChatRoomTo> chatRoomDeserializer()
+  JsonDeserializer<CreateChatRoomRequestTo> chatRoomDeserializer()
   {
-    JsonDeserializer<ChatRoomTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<CreateChatRoomRequestTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class,
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
         false );
     return deserializer;
@@ -150,8 +150,8 @@ public class KafkaServicesConfiguration
   @Bean
   ChatMessageChannel chatMessageChannel(
       ChatBackendProperties properties,
-      Producer<String, MessageTo> chatMessageChannelProducer,
-      Consumer<String, MessageTo> chatMessageChannelConsumer,
+      Producer<String, AbstractTo> chatMessageChannelProducer,
+      Consumer<String, AbstractTo> chatMessageChannelConsumer,
       ZoneId zoneId)
   {
     return new ChatMessageChannel(
@@ -163,11 +163,11 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, MessageTo>  chatMessageChannelProducer(
+  Producer<String, AbstractTo>  chatMessageChannelProducer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
-      JsonSerializer<MessageTo> messageSerializer)
+      JsonSerializer<AbstractTo> messageSerializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -187,21 +187,23 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonSerializer<MessageTo> chatMessageSerializer()
+  JsonSerializer<AbstractTo> chatMessageSerializer()
   {
-    JsonSerializer<MessageTo> serializer = new JsonSerializer<>();
+    JsonSerializer<AbstractTo> serializer = new JsonSerializer<>();
     serializer.configure(
-        Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false),
+        Map.of(JsonSerializer.TYPE_MAPPINGS,
+            "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," +
+            "message:" + ChatMessageTo.class.getCanonicalName()),
         false);
     return serializer;
   }
 
   @Bean
-  Consumer<String, MessageTo>  chatMessageChannelConsumer(
+  Consumer<String, ChatMessageTo>  chatMessageChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
-      JsonDeserializer<MessageTo> messageDeserializer)
+      JsonDeserializer<ChatMessageTo> messageDeserializer)
   {
     Map<String, Object> properties = new HashMap<>();
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
@@ -224,13 +226,13 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  JsonDeserializer<MessageTo> chatMessageDeserializer()
+  JsonDeserializer<ChatMessageTo> chatMessageDeserializer()
   {
-    JsonDeserializer<MessageTo> deserializer = new JsonDeserializer<>();
+    JsonDeserializer<ChatMessageTo> deserializer = new JsonDeserializer<>();
     deserializer.configure(
         Map.of(
             JsonDeserializer.USE_TYPE_INFO_HEADERS, false,
-            JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class,
+            JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class,
             JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()),
         false );
     return deserializer;
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java
deleted file mode 100644 (file)
index 0a867f1..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.time.LocalDateTime;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class MessageTo
-{
-  private String user;
-  private Long id;
-  private String text;
-
-  public Message toMessage(long offset, LocalDateTime timestamp)
-  {
-    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
-  }
-
-  public static MessageTo from(Message message)
-  {
-    return
-        new MessageTo(
-            message.getUsername(),
-            message.getId(),
-            message.getMessageText());
-  }
-}
index fc2b7c8..4e9ad23 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend;
 
 import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
@@ -37,10 +38,17 @@ class KafkaConfigurationIT extends AbstractConfigurationIT
   {
     UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
     int shard = shardingStrategy.selectShard(chatRoomId);
-    chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }");
-    messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }");
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message");
+    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message");
+  }
+
+  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  {
+    ProducerRecord<String, String> record = new ProducerRecord<>(MESSAGES_TOPIC, key, value);
+    record.headers().add("__TypeId__", typeId.getBytes());
+    kafkaTemplate.send(record);
   }
 }
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java
new file mode 100644 (file)
index 0000000..4a6c1c3
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class ChatMessageToTest
+{
+  final String json = """
+  {
+    "id": 1,
+    "text": "Hallo, ich heiße Peter!",
+    "user": "Peter"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    ChatMessageTo message = mapper.readValue(json, ChatMessageTo.class);
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+    assertThat(message.getUser()).isEqualTo("Peter");
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java
new file mode 100644 (file)
index 0000000..e7b749c
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class CreateChatRoomRequestToTest
+{
+  final String json = """
+  {
+    "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7",
+    "name": "Foo-Room!",
+    "shard": 666
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    CreateChatRoomRequestTo message = mapper.readValue(json, CreateChatRoomRequestTo.class);
+    assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7");
+    assertThat(message.getName()).isEqualTo("Foo-Room!");
+    assertThat(message.getShard()).isEqualTo(666);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java
deleted file mode 100644 (file)
index 0c4884b..0000000
+++ /dev/null
@@ -1,39 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
-
-
-public class MessageToTest
-{
-  final String json = """
-  {
-    "id": 1,
-    "text": "Hallo, ich heiße Peter!",
-    "user": "Peter"
-  }""";
-
-  ObjectMapper mapper;
-
-  @BeforeEach
-  public void setUp()
-  {
-    mapper = new ObjectMapper();
-    mapper.registerModule(new JavaTimeModule());
-    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
-  }
-
-  @Test
-  public void testDeserialization() throws Exception
-  {
-    MessageTo message = mapper.readValue(json, MessageTo.class);
-    assertThat(message.getId()).isEqualTo(1l);
-    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
-    assertThat(message.getUser()).isEqualTo("Peter");
-  }
-}