From c48f3c96e4553419a9ae14d3966ba11ceacdd668 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Aug 2023 10:15:02 +0200 Subject: [PATCH] WIP --- .../persistence/kafka/ChatRoomChannel.java | 19 +++++++------ .../kafka/CreateChatRoomCommandTo.java | 27 ------------------- .../persistence/kafka/KafkaChatHome.java | 8 +++--- .../kafka/KafkaChatRoomFactory.java | 2 +- .../kafka/KafkaServicesApplicationRunner.java | 1 + .../kafka/KafkaServicesConfiguration.java | 11 +++++--- .../{ => messages}/AbstractMessageTo.java | 6 ++--- .../messages/CommandCreateChatRoomTo.java | 27 +++++++++++++++++++ .../EventChatMessageReceivedTo.java} | 16 +++++------ .../CommandCreateChatRoomToTest.java} | 6 ++--- .../EventChatMessageReceivedToTest.java} | 6 ++--- 11 files changed, 68 insertions(+), 61 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java rename src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/{ => messages}/AbstractMessageTo.java (60%) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java rename src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/{ChatMessageReceivedEventTo.java => messages/EventChatMessageReceivedTo.java} (57%) rename src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/{CreateChatRoomCommandToTest.java => messages/CommandCreateChatRoomToTest.java} (78%) rename src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/{ChatMessageReceivedEventToTest.java => messages/EventChatMessageReceivedToTest.java} (81%) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 275224d4..c8bc41b0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -76,7 +79,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, String name) { - CreateChatRoomCommandTo createChatRoomRequestTo = CreateChatRoomCommandTo.of(name); + CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name); return Mono.create(sink -> { ProducerRecord record = @@ -123,7 +126,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener null, zdt.toEpochSecond(), chatRoomId.toString(), - ChatMessageReceivedEventTo.of(key.getUsername(), key.getMessageId(), text)); + EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -245,21 +248,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener switch (record.value().getType()) { - case CREATE_CHATROOM_COMMAND: + case COMMAND_CREATE_CHATROOM: createChatRoom( chatRoomId, - (CreateChatRoomCommandTo) record.value(), + (CommandCreateChatRoomTo) record.value(), record.partition()); break; - case CHATMESSAGE_EVENT: + case EVENT_CHATMESSAGE_RECEIVED: Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); loadChatMessage( chatRoomId, timestamp, record.offset(), - (ChatMessageReceivedEventTo) record.value(), + (EventChatMessageReceivedTo) record.value(), record.partition()); break; @@ -277,7 +280,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener void createChatRoom( UUID chatRoomId, - CreateChatRoomCommandTo createChatRoomRequestTo, + CommandCreateChatRoomTo createChatRoomRequestTo, int partition) { log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); @@ -308,7 +311,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener UUID chatRoomId, LocalDateTime timestamp, long offset, - ChatMessageReceivedEventTo chatMessageTo, + EventChatMessageReceivedTo chatMessageTo, int partition) { Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java deleted file mode 100644 index b2601fb8..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandTo.java +++ /dev/null @@ -1,27 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import lombok.*; - - -@Getter -@Setter -@EqualsAndHashCode -@ToString -public class CreateChatRoomCommandTo extends AbstractMessageTo -{ - private String name; - - - public CreateChatRoomCommandTo() - { - super(ToType.CREATE_CHATROOM_COMMAND); - } - - - public static CreateChatRoomCommandTo of(String name) - { - CreateChatRoomCommandTo to = new CreateChatRoomCommandTo(); - to.name = name; - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 26887a95..324e80bf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -16,26 +16,26 @@ import java.util.*; public class KafkaChatHome implements ChatHome { private final KafkaLikeShardingStrategy shardingStrategy; - private final ChatRoomChannel chatMessageChanel; + private final ChatRoomChannel chatRoomChannel; @Override public Mono getChatRoom(UUID id) { int shard = shardingStrategy.selectShard(id); - if (chatMessageChanel.isLoadInProgress()) + if (chatRoomChannel.isLoadInProgress()) { throw new LoadInProgressException(shard); } else { - return chatMessageChanel.getChatRoom(shard, id); + return chatRoomChannel.getChatRoom(shard, id); } } @Override public Flux getChatRooms() { - return chatMessageChanel.getChatRooms(); + return chatRoomChannel.getChatRooms(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index 825f16eb..6a1dc78a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -18,7 +18,7 @@ public class KafkaChatRoomFactory implements ChatRoomFactory @Override public Mono createChatRoom(UUID id, String name) { - log.info("Sending create-request for chat rooom: id={}, name={}"); + log.info("Sending create-command for chat rooom: id={}, name={}"); return chatRoomChannel.sendCreateChatRoomRequest(id, name); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index fec48b0c..5c7e0d8c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index e8c3f0d8..dda8748e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.CommandCreateChatRoomTo; +import de.juplo.kafka.chat.backend.persistence.kafka.messages.EventChatMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -80,7 +83,7 @@ public class KafkaServicesConfiguration defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ProducerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_PRODUCER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER"); return new KafkaProducer<>( properties, stringSerializer, @@ -115,7 +118,7 @@ public class KafkaServicesConfiguration defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value)); properties.put( ConsumerConfig.CLIENT_ID_CONFIG, - chatBackendProperties.getKafka().getClientIdPrefix() + "_MESSAGE_CHANNEL_CONSUMER"); + chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER"); properties.put( ConsumerConfig.GROUP_ID_CONFIG, "chat_message_channel"); @@ -147,8 +150,8 @@ public class KafkaServicesConfiguration String typeMappings () { return - "create:" + CreateChatRoomCommandTo.class.getCanonicalName() + "," + - "message:" + ChatMessageReceivedEventTo.class.getCanonicalName(); + "create:" + CommandCreateChatRoomTo.class.getCanonicalName() + "," + + "message:" + EventChatMessageReceivedTo.class.getCanonicalName(); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java similarity index 60% rename from src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java index 85a194e9..7cc7541e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/AbstractMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/AbstractMessageTo.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; +package de.juplo.kafka.chat.backend.persistence.kafka.messages; import lombok.Getter; @@ -9,8 +9,8 @@ import lombok.RequiredArgsConstructor; public class AbstractMessageTo { public enum ToType { - CREATE_CHATROOM_COMMAND, - CHATMESSAGE_EVENT, + COMMAND_CREATE_CHATROOM, + EVENT_CHATMESSAGE_RECEIVED, } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java new file mode 100644 index 00000000..1a134f31 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomTo.java @@ -0,0 +1,27 @@ +package de.juplo.kafka.chat.backend.persistence.kafka.messages; + +import lombok.*; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class CommandCreateChatRoomTo extends AbstractMessageTo +{ + private String name; + + + public CommandCreateChatRoomTo() + { + super(ToType.COMMAND_CREATE_CHATROOM); + } + + + public static CommandCreateChatRoomTo of(String name) + { + CommandCreateChatRoomTo to = new CommandCreateChatRoomTo(); + to.name = name; + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java similarity index 57% rename from src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java index 2bc7a034..2297b949 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedTo.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; +package de.juplo.kafka.chat.backend.persistence.kafka.messages; import de.juplo.kafka.chat.backend.domain.Message; import lombok.*; @@ -10,16 +10,16 @@ import java.time.LocalDateTime; @Setter @EqualsAndHashCode @ToString -public class ChatMessageReceivedEventTo extends AbstractMessageTo +public class EventChatMessageReceivedTo extends AbstractMessageTo { private String user; private Long id; private String text; - public ChatMessageReceivedEventTo() + public EventChatMessageReceivedTo() { - super(ToType.CHATMESSAGE_EVENT); + super(ToType.EVENT_CHATMESSAGE_RECEIVED); } @@ -28,18 +28,18 @@ public class ChatMessageReceivedEventTo extends AbstractMessageTo return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); } - public static ChatMessageReceivedEventTo from(Message message) + public static EventChatMessageReceivedTo from(Message message) { - return ChatMessageReceivedEventTo.of( + return EventChatMessageReceivedTo.of( message.getUsername(), message.getId(), message.getMessageText()); } - public static ChatMessageReceivedEventTo of(String user, Long id, String text) + public static EventChatMessageReceivedTo of(String user, Long id, String text) { - ChatMessageReceivedEventTo to = new ChatMessageReceivedEventTo(); + EventChatMessageReceivedTo to = new EventChatMessageReceivedTo(); to.user = user; to.id = id; to.text = text; diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java similarity index 78% rename from src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java rename to src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java index 71a29715..5ef12efe 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/CreateChatRoomCommandToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/CommandCreateChatRoomToTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; +package de.juplo.kafka.chat.backend.persistence.kafka.messages; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -9,7 +9,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; -public class CreateChatRoomCommandToTest +public class CommandCreateChatRoomToTest { final String json = """ { @@ -29,7 +29,7 @@ public class CreateChatRoomCommandToTest @Test public void testDeserialization() throws Exception { - CreateChatRoomCommandTo message = mapper.readValue(json, CreateChatRoomCommandTo.class); + CommandCreateChatRoomTo message = mapper.readValue(json, CommandCreateChatRoomTo.class); assertThat(message.getName()).isEqualTo("Foo-Room!"); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java similarity index 81% rename from src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java rename to src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java index be612c8b..33a78277 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageReceivedEventToTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/messages/EventChatMessageReceivedToTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; +package de.juplo.kafka.chat.backend.persistence.kafka.messages; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -9,7 +9,7 @@ import org.junit.jupiter.api.Test; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; -public class ChatMessageReceivedEventToTest +public class EventChatMessageReceivedToTest { final String json = """ { @@ -31,7 +31,7 @@ public class ChatMessageReceivedEventToTest @Test public void testDeserialization() throws Exception { - ChatMessageReceivedEventTo message = mapper.readValue(json, ChatMessageReceivedEventTo.class); + EventChatMessageReceivedTo message = mapper.readValue(json, EventChatMessageReceivedTo.class); assertThat(message.getId()).isEqualTo(1l); assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); assertThat(message.getUser()).isEqualTo("Peter"); -- 2.20.1