From 1416ccc8a9eae999201dbf7c77c4d4906fc9fc24 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Apr 2023 11:29:39 +0200 Subject: [PATCH] NG --- .../backend/persistence/kafka/AbstractTo.java | 15 ++++ .../persistence/kafka/ChatMessageChannel.java | 76 ++++++++++++------- .../persistence/kafka/ChatMessageTo.java | 45 +++++++++++ .../persistence/kafka/ChatRoomChannel.java | 18 ++--- .../backend/persistence/kafka/ChatRoomTo.java | 30 -------- .../kafka/CreateChatRoomRequestTo.java | 42 ++++++++++ .../kafka/KafkaServicesApplicationRunner.java | 4 +- .../kafka/KafkaServicesConfiguration.java | 48 ++++++------ .../backend/persistence/kafka/MessageTo.java | 33 -------- .../chat/backend/KafkaConfigurationIT.java | 18 +++-- ...sageToTest.java => ChatMessageToTest.java} | 4 +- .../kafka/CreateChatRoomRequestToTest.java | 39 ++++++++++ 12 files changed, 242 insertions(+), 130 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java rename src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/{MessageToTest.java => ChatMessageToTest.java} (90%) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java new file mode 100644 index 00000000..216ff2e7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class AbstractTo +{ + public enum ToType { MESSAGE_SENT, CREATE_CHATROOM_REQUEST }; + + @Getter + private final ToType type; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 138d9a7b..8294316f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -25,8 +25,8 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final boolean[] isShardOwned; @@ -42,8 +42,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Producer producer, - Consumer consumer, + Producer producer, + Consumer consumer, ZoneId zoneId, int numShards) { @@ -78,13 +78,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( tp.topic(), tp.partition(), zdt.toEpochSecond(), chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); + ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -165,12 +165,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadMessages(records); + loadChatRoom(records); if (isLoadingCompleted()) { @@ -198,31 +198,55 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - void loadMessages(ConsumerRecords records) + void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - nextOffset[record.partition()] = record.offset() + 1; - UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); + switch (record.value().getType()) + { + case CREATE_CHATROOM_REQUEST: + createChatRoom((CreateChatRoomRequestTo) record.value()); + break; + + case MESSAGE_SENT: + UUID chatRoomId = UUID.fromString(record.key()); + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageTo) record.value(), + record.partition()); + break; + } - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + nextOffset[record.partition()] = record.offset() + 1; + } + } - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + void createChatRoom( + CreateChatRoomRequestTo createChatRoomRequestTo, + int partition) + { + chatrooms[partition].put + } - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - if (chatRoom == null) - { - // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen! - } - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); - kafkaChatRoomService.persistMessage(message); - } + kafkaChatRoomService.persistMessage(message); } boolean isLoadingCompleted() diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java new file mode 100644 index 00000000..41ce00a4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageTo.java @@ -0,0 +1,45 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Data; + +import java.time.LocalDateTime; + + +@Data +public class ChatMessageTo extends AbstractTo +{ + private String user; + private Long id; + private String text; + + + public ChatMessageTo() + { + super(ToType.MESSAGE_SENT); + } + + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static ChatMessageTo from(Message message) + { + return ChatMessageTo.of( + message.getUsername(), + message.getId(), + message.getMessageText()); + } + + + public static ChatMessageTo of(String user, Long id, String text) + { + ChatMessageTo to = new ChatMessageTo(); + to.user = user; + to.id = id; + to.text = text; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 97ee9885..57deaf27 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -22,8 +22,8 @@ import java.util.UUID; public class ChatRoomChannel implements Runnable { private final String topic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final ShardingStrategy shardingStrategy; private final ChatMessageChannel chatMessageChannel; private final Clock clock; @@ -37,21 +37,21 @@ public class ChatRoomChannel implements Runnable String name) { int shard = this.shardingStrategy.selectShard(chatRoomId); - ChatRoomTo chatRoomTo = ChatRoomTo.of(chatRoomId.toString(), name, shard); + CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(chatRoomId.toString(), name, shard); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( topic, shard, - chatRoomTo); + createChatRoomRequestTo); producer.send(record, ((metadata, exception) -> { if (metadata != null) { - log.info("Successfully send chreate-request for chat room: {}", chatRoomTo); - sink.success(chatRoomTo.toChatRoomInfo()); + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + sink.success(createChatRoomRequestTo.toChatRoomInfo()); } else { @@ -78,10 +78,10 @@ public class ChatRoomChannel implements Runnable { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { createChatRoom(record.value().toChatRoomInfo()); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java deleted file mode 100644 index e5649816..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomTo.java +++ /dev/null @@ -1,30 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.UUID; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class ChatRoomTo -{ - private String id; - private String name; - private int shard; - - public ChatRoomInfo toChatRoomInfo() - { - return new ChatRoomInfo(UUID.fromString(id), name, shard); - } - - public static ChatRoomTo from(ChatRoom chatRoom) - { - return ChatRoomTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java new file mode 100644 index 00000000..eb573929 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java @@ -0,0 +1,42 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + + +@Data +public class CreateChatRoomRequestTo extends AbstractTo +{ + private String id; + private String name; + private int shard; + + + public CreateChatRoomRequestTo() + { + super(ToType.CREATE_CHATROOM_REQUEST); + } + + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static CreateChatRoomRequestTo from(ChatRoom chatRoom) + { + return CreateChatRoomRequestTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); + } + + public static CreateChatRoomRequestTo of(String id, String name, int shard) + { + CreateChatRoomRequestTo to = new CreateChatRoomRequestTo(); + to.id = id; + to.name = name; + to.shard = shard; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index ee5834e5..8a9e32ed 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -30,11 +30,11 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ChatRoomChannel chatRoomChannel; @Autowired - Consumer chatRoomChannelConsumer; + Consumer chatRoomChannelConsumer; @Autowired ChatMessageChannel chatMessageChannel; @Autowired - Consumer chatMessageChannelConsumer; + Consumer chatMessageChannelConsumer; CompletableFuture chatRoomChannelConsumerJob; CompletableFuture chatMessageChannelConsumerJob; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 43507792..9e1f75e7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -51,8 +51,8 @@ public class KafkaServicesConfiguration @Bean ChatRoomChannel chatRoomChannel( ChatBackendProperties properties, - Producer chatRoomChannelProducer, - Consumer chatRoomChannelConsumer, + Producer chatRoomChannelProducer, + Consumer chatRoomChannelConsumer, ShardingStrategy shardingStrategy, ChatMessageChannel chatMessageChannel, Clock clock) @@ -68,11 +68,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatRoomChannelProducer( + Producer chatRoomChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, IntegerSerializer integerSerializer, - JsonSerializer chatRoomSerializer) + JsonSerializer chatRoomSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -92,9 +92,9 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatRoomSerializer() + JsonSerializer chatRoomSerializer() { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), false); @@ -102,11 +102,11 @@ public class KafkaServicesConfiguration } @Bean - Consumer chatRoomChannelConsumer( + Consumer chatRoomChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, IntegerDeserializer integerDeserializer, - JsonDeserializer chatRoomDeserializer) + JsonDeserializer chatRoomDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -129,13 +129,13 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatRoomDeserializer() + JsonDeserializer chatRoomDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, ChatRoomTo.class, + JsonDeserializer.VALUE_DEFAULT_TYPE, CreateChatRoomRequestTo.class, JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), false ); return deserializer; @@ -150,8 +150,8 @@ public class KafkaServicesConfiguration @Bean ChatMessageChannel chatMessageChannel( ChatBackendProperties properties, - Producer chatMessageChannelProducer, - Consumer chatMessageChannelConsumer, + Producer chatMessageChannelProducer, + Consumer chatMessageChannelConsumer, ZoneId zoneId) { return new ChatMessageChannel( @@ -163,11 +163,11 @@ public class KafkaServicesConfiguration } @Bean - Producer chatMessageChannelProducer( + Producer chatMessageChannelProducer( Properties defaultProducerProperties, ChatBackendProperties chatBackendProperties, StringSerializer stringSerializer, - JsonSerializer messageSerializer) + JsonSerializer messageSerializer) { Map properties = new HashMap<>(); defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -187,21 +187,23 @@ public class KafkaServicesConfiguration } @Bean - JsonSerializer chatMessageSerializer() + JsonSerializer chatMessageSerializer() { - JsonSerializer serializer = new JsonSerializer<>(); + JsonSerializer serializer = new JsonSerializer<>(); serializer.configure( - Map.of(JsonSerializer.ADD_TYPE_INFO_HEADERS, false), + Map.of(JsonSerializer.TYPE_MAPPINGS, + "create:" + CreateChatRoomRequestTo.class.getCanonicalName() + "," + + "message:" + ChatMessageTo.class.getCanonicalName()), false); return serializer; } @Bean - Consumer chatMessageChannelConsumer( + Consumer chatMessageChannelConsumer( Properties defaultConsumerProperties, ChatBackendProperties chatBackendProperties, StringDeserializer stringDeserializer, - JsonDeserializer messageDeserializer) + JsonDeserializer messageDeserializer) { Map properties = new HashMap<>(); defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); @@ -224,13 +226,13 @@ public class KafkaServicesConfiguration } @Bean - JsonDeserializer chatMessageDeserializer() + JsonDeserializer chatMessageDeserializer() { - JsonDeserializer deserializer = new JsonDeserializer<>(); + JsonDeserializer deserializer = new JsonDeserializer<>(); deserializer.configure( Map.of( JsonDeserializer.USE_TYPE_INFO_HEADERS, false, - JsonDeserializer.VALUE_DEFAULT_TYPE, MessageTo.class, + JsonDeserializer.VALUE_DEFAULT_TYPE, ChatMessageTo.class, JsonDeserializer.TRUSTED_PACKAGES, getClass().getPackageName()), false ); return deserializer; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java deleted file mode 100644 index 0a867f16..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java +++ /dev/null @@ -1,33 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.time.LocalDateTime; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class MessageTo -{ - private String user; - private Long id; - private String text; - - public Message toMessage(long offset, LocalDateTime timestamp) - { - return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); - } - - public static MessageTo from(Message message) - { - return - new MessageTo( - message.getUsername(), - message.getId(), - message.getMessageText()); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java index fc2b7c89..4e9ad239 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -37,10 +38,17 @@ class KafkaConfigurationIT extends AbstractConfigurationIT { UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); int shard = shardingStrategy.selectShard(chatRoomId); - chatRoomTemplate.send(CHATROOMS_TOPIC, null,"{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }"); - messageTemplate.send(MESSAGES_TOPIC,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }"); + send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": " + shard + ", \"name\": \"FOO\" }", "create"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "message"); + send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "message"); + } + + static void send(KafkaTemplate kafkaTemplate, String key, String value, String typeId) + { + ProducerRecord record = new ProducerRecord<>(MESSAGES_TOPIC, key, value); + record.headers().add("__TypeId__", typeId.getBytes()); + kafkaTemplate.send(record); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java similarity index 90% rename from src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java rename to src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java index 0c4884bf..4a6c1c32 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageToTest.java @@ -9,7 +9,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; -public class MessageToTest +public class ChatMessageToTest { final String json = """ { @@ -31,7 +31,7 @@ public class MessageToTest @Test public void testDeserialization() throws Exception { - MessageTo message = mapper.readValue(json, MessageTo.class); + ChatMessageTo message = mapper.readValue(json, ChatMessageTo.class); assertThat(message.getId()).isEqualTo(1l); assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); assertThat(message.getUser()).isEqualTo("Peter"); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java new file mode 100644 index 00000000..e7b749c4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class CreateChatRoomRequestToTest +{ + final String json = """ + { + "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "name": "Foo-Room!", + "shard": 666 + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + CreateChatRoomRequestTo message = mapper.readValue(json, CreateChatRoomRequestTo.class); + assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + assertThat(message.getShard()).isEqualTo(666); + } +} -- 2.20.1