From: Kai Moritz Date: Sat, 22 Apr 2023 09:51:48 +0000 (+0200) Subject: NG X-Git-Tag: rebase--2023-08-18~2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=28a98bfdbed0cf56697bece5efbe6eb52f331611;p=demos%2Fkafka%2Fchat NG --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java index 216ff2e7..5c08aa2a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractTo.java @@ -8,7 +8,11 @@ import lombok.RequiredArgsConstructor; @RequiredArgsConstructor public class AbstractTo { - public enum ToType { MESSAGE_SENT, CREATE_CHATROOM_REQUEST }; + public enum ToType { + CREATE_CHATROOM_REQUEST, + MESSAGE_SENT, + CHATROOM_INFO + } @Getter private final ToType type; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 8294316f..8a53d3c9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.Getter; @@ -67,7 +68,44 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } - Mono sendMessage( + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, LocalDateTime timestamp, @@ -202,14 +240,18 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { for (ConsumerRecord record : records) { + UUID chatRoomId = UUID.fromString(record.key()); + switch (record.value().getType()) { case CREATE_CHATROOM_REQUEST: - createChatRoom((CreateChatRoomRequestTo) record.value()); + createChatRoom( + chatRoomId, + (CreateChatRoomRequestTo) record.value(), + record.partition()); break; case MESSAGE_SENT: - UUID chatRoomId = UUID.fromString(record.key()); Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); loadChatMessage( @@ -226,10 +268,26 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } void createChatRoom( + UUID chatRoomId, CreateChatRoomRequestTo createChatRoomRequestTo, int partition) { - chatrooms[partition].put + putChatRoom(ChatRoomInfo.of( + chatRoomId, + createChatRoomRequestTo.getName(), + partition)); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); } void loadChatMessage( @@ -267,7 +325,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } - void putChatRoom(ChatRoom chatRoom) + private void putChatRoom(ChatRoom chatRoom) { Integer partition = chatRoom.getShard(); UUID chatRoomId = chatRoom.getId(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 57deaf27..8bbc82ec 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -32,41 +32,6 @@ public class ChatRoomChannel implements Runnable private boolean running; - Mono sendCreateChatRoomRequest( - UUID chatRoomId, - String name) - { - int shard = this.shardingStrategy.selectShard(chatRoomId); - CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(chatRoomId.toString(), name, shard); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - topic, - shard, - createChatRoomRequestTo); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); - sink.success(createChatRoomRequestTo.toChatRoomInfo()); - } - else - { - // On send-failure - log.error( - "Could not send create-request for chat room (id={}, name={}): {}", - chatRoomId, - name, - exception); - sink.error(exception); - } - })); - }); - } - @Override public void run() { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java index eb573929..b6ad38fb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomRequestTo.java @@ -10,9 +10,7 @@ import java.util.UUID; @Data public class CreateChatRoomRequestTo extends AbstractTo { - private String id; private String name; - private int shard; public CreateChatRoomRequestTo() @@ -21,22 +19,10 @@ public class CreateChatRoomRequestTo extends AbstractTo } - public ChatRoomInfo toChatRoomInfo() - { - return new ChatRoomInfo(UUID.fromString(id), name, shard); - } - - public static CreateChatRoomRequestTo from(ChatRoom chatRoom) - { - return CreateChatRoomRequestTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); - } - - public static CreateChatRoomRequestTo of(String id, String name, int shard) + public static CreateChatRoomRequestTo of(String name) { CreateChatRoomRequestTo to = new CreateChatRoomRequestTo(); - to.id = id; to.name = name; - to.shard = shard; return to; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 09861946..f8022348 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -28,7 +28,7 @@ public class KafkaChatRoomService implements ChatRoomService String text) { return chatMessageChannel - .sendMessage(chatRoomId, key, timestamp, text) + .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java new file mode 100644 index 00000000..f232c782 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoTo.java @@ -0,0 +1,42 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + + +@Data +public class ChatRoomInfoTo extends AbstractTo +{ + private String id; + private String name; + private int shard; + + + public ChatRoomInfoTo() + { + super(ToType.CHATROOM_INFO); + } + + + public ChatRoomInfo toChatRoomInfo() + { + return new ChatRoomInfo(UUID.fromString(id), name, shard); + } + + public static ChatRoomInfoTo from(ChatRoom chatRoom) + { + return ChatRoomInfoTo.of(chatRoom.getId().toString(), chatRoom.getName(), chatRoom.getShard()); + } + + public static ChatRoomInfoTo of(String id, String name, int shard) + { + ChatRoomInfoTo to = new ChatRoomInfoTo(); + to.id = id; + to.name = name; + to.shard = shard; + return to; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java new file mode 100644 index 00000000..6132eace --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomInfoToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class ChatRoomInfoToTest +{ + final String json = """ + { + "id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "name": "Foo-Room!", + "shard": 666 + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + ChatRoomInfoTo message = mapper.readValue(json, ChatRoomInfoTo.class); + assertThat(message.getId()).isEqualTo("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + assertThat(message.getShard()).isEqualTo(666); + } +}