From: Kai Moritz Date: Sun, 20 Aug 2023 08:15:02 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-08-20~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=c48f3c96e4553419a9ae14d3966ba11ceacdd668;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java deleted file mode 100644 index 85a194e9..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - - -import lombok.Getter; -import lombok.RequiredArgsConstructor; - - -@RequiredArgsConstructor -public class AbstractMessageTo -{ - public enum ToType { - CREATE_CHATROOM_COMMAND, - CHATMESSAGE_EVENT, - } - - @Getter - private final ToType type; -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java deleted file mode 100644 index 2bc7a034..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java +++ /dev/null @@ -1,48 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.*; - -import java.time.LocalDateTime; - - -@Getter -@Setter -@EqualsAndHashCode -@ToString -public class ChatMessageReceivedEventTo extends AbstractMessageTo -{ - private String user; - private Long id; - private String text; - - - public ChatMessageReceivedEventTo() - { - super(ToType.CHATMESSAGE_EVENT); - } - - - public Message toMessage(long offset, LocalDateTime timestamp) - { - return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); - } - - public static ChatMessageReceivedEventTo from(Message message) - { - return ChatMessageReceivedEventTo.of( - message.getUsername(), - message.getId(), - message.getMessageText()); - } - - - public static ChatMessageReceivedEventTo of(String user, Long id, String text) - { - ChatMessageReceivedEventTo to = new ChatMessageReceivedEventTo(); - to.user = user; - to.id = id; - to.text = text; - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 275224d4..c8bc41b0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -76,7 +79,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, String name) { - CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name); + CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); return Mono.create(sink -> { ProducerRecord record = @@ -123,7 +126,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener null, zdt.toEpochSecond(), chatRoomId.toString(), - ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text)); + EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -245,21 +248,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener switch (record.value().getType()) { - case CREATE_CHATROOM_COMMAND: + case COMMAND_CREATE_CHATROOM: createChatRoom( chatRoomId, - (CreateChatRoomCommandTo) record.value(), + (CommandCreateChatRoomTo) record.value(), record.partition()); break; - case CHATMESSAGE_EVENT: + case EVENT_CHATMESSAGE_RECEIVED: Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); loadChatMessage( chatRoomId, timestamp, record.offset(), - (ChatMessageReceivedEventTo) record.value(), + (EventChatMessageReceivedTo) record.value(), record.partition()); break; @@ -277,7 +280,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener void createChatRoom( UUID chatRoomId, - CreateChatRoomCommandTo createChatRoomRequestTo, + CommandCreateChatRoomTo createChatRoomRequestTo, int partition) { log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); @@ -308,7 +311,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, LocalDateTime timestamp, long offset, - ChatMessageReceivedEventTo chatMessageTo, + EventChatMessageReceivedTo chatMessageTo, int partition) { Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java deleted file mode 100644 index b2601fb8..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java +++ /dev/null @@ -1,27 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import lombok.*; - - -@Getter -@Setter -@EqualsAndHashCode -@ToString -public class CreateChatRoomCommandTo extends AbstractMessageTo -{ - private String name; - - - public CreateChatRoomCommandTo() - { - super(ToType.CREATE_CHATROOM_COMMAND); - } - - - public static CreateChatRoomCommandTo of(String name) - { - CreateChatRoomCommandTo to = new CreateChatRoomCommandTo(); - to.name = name; - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 26887a95..324e80bf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -16,26 +16,26 @@ import java.util.*; public class KafkaChatHome implements ChatHome { private final KafkaLikeShardingStrategy shardingStrategy; - private final ChatRoomChannel chatMessageChanel; + private final ChatRoomChannel chatRoomChannel; @Override public Mono getChatRoom(UUID id) { int shard = shardingStrategy.selectShard(id); - if (chatMessageChanel.isLoadInProgress()) + if (chatRoomChannel.isLoadInProgress()) { throw new LoadInProgressException(shard); } else { - return chatMessageChanel.getChatRoom(shard, id); + return chatRoomChannel.getChatRoom(shard, id); } } @Override public Flux getChatRooms() { - return chatMessageChanel.getChatRooms(); + return chatRoomChannel.getChatRooms(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index 825f16eb..6a1dc78a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -18,7 +18,7 @@ public class KafkaChatRoomFactory implements ChatRoomFactory @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-request for chat rooom: id={}, name={}"); + log.info("Sending create-command for chat rooom: id={}, name={}"); return chatRoomChannel.sendCreateChatRoomRequest(id, name); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index fec48b0c..5c7e0d8c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index e8c3f0d8..dda8748e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -80,7 +83,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -115,7 +118,7 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); @@ -147,8 +150,8 @@ public class KafkaServicesConfiguration String typeMappings () { return - "create:" + CreateChatRoomCommandTo.class.getCanonicalName() + "," + - "message:" + ChatMessageReceivedEventTo.class.getCanonicalName(); + "create:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "message:" + EventChatMessageReceivedTo.class.getCanonicalName(); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java new file mode 100644 index 00000000..7cc7541e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class AbstractMessageTo +{ + public enum ToType { + COMMAND_CREATE_CHATROOM, + EVENT_CHATMESSAGE_RECEIVED, + } + + @Getter + private final ToType type; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java new file mode 100644 index 00000000..1a134f31 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java @@ -0,0 +1,27 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import lombok.*; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class CommandCreateChatRoomTo extends AbstractMessageTo +{ + private String name; + + + public CommandCreateChatRoomTo() + { + super(ToType.COMMAND_CREATE_CHATROOM); + } + + + public static CommandCreateChatRoomTo of(String name) + { + CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); + to.name = name; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java new file mode 100644 index 00000000..2297b949 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java @@ -0,0 +1,48 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.*; + +import java.time.LocalDateTime; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventChatMessageReceivedTo extends AbstractMessageTo +{ + private String user; + private Long id; + private String text; + + + public EventChatMessageReceivedTo() + { + super(ToType.EVENT_CHATMESSAGE_RECEIVED); + } + + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static EventChatMessageReceivedTo from(Message message) + { + return EventChatMessageReceivedTo.of( + message.getUsername(), + message.getId(), + message.getMessageText()); + } + + + public static EventChatMessageReceivedTo of(String user, Long id, String text) + { + EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); + to.user = user; + to.id = id; + to.text = text; + return to; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java deleted file mode 100644 index be612c8b..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - - -public class ChatMessageReceivedEventToTest -{ - final String json = """ - { - "id": 1, - "text": "Hallo, ich heiße Peter!", - "user": "Peter" - }"""; - - ObjectMapper mapper; - - @BeforeEach - public void setUp() - { - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } - - @Test - public void testDeserialization() throws Exception - { - ChatMessageReceivedEventTo message = mapper.readValue(json, ChatMessageReceivedEventTo.class); - assertThat(message.getId()).isEqualTo(1l); - assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); - assertThat(message.getUser()).isEqualTo("Peter"); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java deleted file mode 100644 index 71a29715..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java +++ /dev/null @@ -1,35 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.SerializationFeature; -import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; - - -public class CreateChatRoomCommandToTest -{ - final String json = """ - { - "name": "Foo-Room!" - }"""; - - ObjectMapper mapper; - - @BeforeEach - public void setUp() - { - mapper = new ObjectMapper(); - mapper.registerModule(new JavaTimeModule()); - mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); - } - - @Test - public void testDeserialization() throws Exception - { - CreateChatRoomCommandTo message = mapper.readValue(json, CreateChatRoomCommandTo.class); - assertThat(message.getName()).isEqualTo("Foo-Room!"); - } -} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java new file mode 100644 index 00000000..5ef12efe --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class CommandCreateChatRoomToTest +{ + final String json = """ + { + "name": "Foo-Room!" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); + assertThat(message.getName()).isEqualTo("Foo-Room!"); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java new file mode 100644 index 00000000..33a78277 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class EventChatMessageReceivedToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +}