From 08e679fa4ec48797df9750637831bacec7826ea3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Aug 2023 10:08:04 +0200 Subject: [PATCH] WIP --- ...{ChatMessageChannel.java => ChatRoomChannel.java} | 4 ++-- .../backend/persistence/kafka/KafkaChatHome.java | 4 +--- .../persistence/kafka/KafkaChatRoomFactory.java | 4 ++-- .../persistence/kafka/KafkaChatRoomService.java | 4 ++-- .../kafka/KafkaServicesApplicationRunner.java | 4 ++-- .../kafka/KafkaServicesConfiguration.java | 12 ++++++------ 6 files changed, 15 insertions(+), 17 deletions(-) rename src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/{ChatMessageChannel.java => ChatRoomChannel.java} (99%) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java similarity index 99% rename from src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 08975a9d..1308946e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -22,7 +22,7 @@ import java.util.stream.IntStream; @Slf4j -public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener +public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener { private final String topic; private final Producer producer; @@ -41,7 +41,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener private volatile boolean loadInProgress; - public ChatMessageChannel( + public ChatRoomChannel( String topic, Producer producer, Consumer consumer, diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 77790bd6..26887a95 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -2,8 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -18,7 +16,7 @@ import java.util.*; public class KafkaChatHome implements ChatHome { private final KafkaLikeShardingStrategy shardingStrategy; - private final ChatMessageChannel chatMessageChanel; + private final ChatRoomChannel chatMessageChanel; @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index c46529d8..825f16eb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -13,12 +13,12 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { - private final ChatMessageChannel chatMessageChannel; + private final ChatRoomChannel chatRoomChannel; @Override public Mono createChatRoom(UUID id, String name) { log.info("Sending create-request for chat rooom: id={}, name={}"); - return chatMessageChannel.sendCreateChatRoomRequest(id, name); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 77ecf1ca..7b2cc0b1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -15,7 +15,7 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomService implements ChatRoomService { - private final ChatMessageChannel chatMessageChannel; + private final ChatRoomChannel chatRoomChannel; private final UUID chatRoomId; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,7 +27,7 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { - return chatMessageChannel + return chatRoomChannel .sendChatMessage(chatRoomId, key, timestamp, text) .doOnSuccess(message -> persistMessage(message)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java index f0dc3155..fac35825 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesApplicationRunner.java @@ -28,7 +28,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner ConfigurableApplicationContext context; @Autowired - ChatMessageChannel chatMessageChannel; + ChatRoomChannel chatRoomChannel; @Autowired Consumer chatMessageChannelConsumer; @@ -40,7 +40,7 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner { log.info("Starting the consumer for the ChatMessageChannel"); chatMessageChannelConsumerJob = taskExecutor - .submitCompletable(chatMessageChannel) + .submitCompletable(chatRoomChannel) .exceptionally(e -> { log.error("The consumer for the ChatMessageChannel exited abnormally!", e); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 84919c71..724739bf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -34,15 +34,15 @@ public class KafkaServicesConfiguration @Bean ChatHome kafkaChatHome( KafkaLikeShardingStrategy shardingStrategy, - ChatMessageChannel chatMessageChannel) + ChatRoomChannel chatRoomChannel) { - return new KafkaChatHome(shardingStrategy, chatMessageChannel); + return new KafkaChatHome(shardingStrategy, chatRoomChannel); } @Bean - KafkaChatRoomFactory chatRoomFactory(ChatMessageChannel chatMessageChannel) + KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) { - return new KafkaChatRoomFactory(chatMessageChannel); + return new KafkaChatRoomFactory(chatRoomChannel); } @Bean @@ -52,14 +52,14 @@ public class KafkaServicesConfiguration } @Bean - ChatMessageChannel chatMessageChannel( + ChatRoomChannel chatMessageChannel( ChatBackendProperties properties, Producer chatMessageChannelProducer, Consumer chatMessageChannelConsumer, ZoneId zoneId, Clock clock) { - return new ChatMessageChannel( + return new ChatRoomChannel( properties.getKafka().getMessageChannelTopic(), chatMessageChannelProducer, chatMessageChannelConsumer, -- 2.20.1