From 576aa2838f4ca284049731128aa0f3fe9751b75b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 20:35:51 +0200 Subject: [PATCH] WIP --- .../chat/backend/domain/ChatRoomInfo.java | 6 ++++ .../implementation/kafka/InfoChannel.java | 16 ++++++---- .../kafka/KafkaChatHomeService.java | 5 +-- .../kafka/KafkaServicesApplicationRunner.java | 32 +++++++++++++++---- .../kafka/KafkaServicesConfiguration.java | 1 - .../mongodb/MongoDbStorageStrategy.java | 13 +------- 6 files changed, 44 insertions(+), 29 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java index 33c522d1..e91b28c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -19,4 +19,10 @@ public class ChatRoomInfo private final String name; @Getter private final Integer shard; + + + public ChatRoomInfo(UUID id, String name) + { + this(id, name, null); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 22c4668d..5a1d186d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -30,6 +30,7 @@ public class InfoChannel implements Runnable private final long[] startOffset; private final long[] currentOffset; private final Map chatRoomInfo; + private final DataChannel dataChannel; private boolean running; @@ -37,7 +38,8 @@ public class InfoChannel implements Runnable public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + DataChannel dataChannel) { log.debug( "Creating InfoChannel for topic {}", @@ -60,15 +62,16 @@ public class InfoChannel implements Runnable .entrySet() .stream() .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue()); + + this.dataChannel = dataChannel; } - Mono loadInProgress() + boolean loadInProgress() { - return Mono - .fromSupplier(() -> IntStream - .range(0, numShards) - .anyMatch(partition -> currentOffset[partition] < startOffset[partition])); + return IntStream + .range(0, numShards) + .anyMatch(partition -> currentOffset[partition] < startOffset[partition]); } Mono sendChatRoomCreatedEvent( @@ -176,6 +179,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.dataChannel.createChatRoom(chatRoomInfo); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index d0b8eff7..9832519d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -40,10 +40,7 @@ public class KafkaChatHomeService implements ChatHomeService { return infoChannel .getChatRoomInfo(id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - dataChannel.getOwnedShards()))); + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index 97cfa699..f6e39b35 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -5,6 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.PartitionInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -33,21 +34,40 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner @Autowired ConfigurableApplicationContext context; + @Autowired + InfoChannel infoChannel; @Autowired DataChannel dataChannel; @Autowired - Consumer chatRoomChannelConsumer; + Consumer infoChannelConsumer; + @Autowired + Consumer dataChannelConsumer; - CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture infoChannelConsumerJob; + CompletableFuture dataChannelConsumerJob; @Override public void run(ApplicationArguments args) throws Exception { + String infoTopic = properties.getKafka().getInfoChannelTopic(); + List< PartitionInfo> partitions = + infoChannelConsumer.partitionsFor(infoTopic); + infoChannelConsumer.assignment(partitions); + log.info("Starting the consumer for the InfoChannel"); + infoChannelConsumerJob = taskExecutor + .submitCompletable(infoChannel) + .exceptionally(e -> + { + log.error("The consumer for the InfoChannel exited abnormally!", e); + return null; + }); + + while () List topics = List.of(properties.getKafka().getDataChannelTopic()); - chatRoomChannelConsumer.subscribe(topics, dataChannel); + dataChannelConsumer.subscribe(topics, dataChannel); log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor + dataChannelConsumerJob = taskExecutor .submitCompletable(dataChannel) .exceptionally(e -> { @@ -60,9 +80,9 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner public void joinChatRoomChannelConsumerJob() { log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); + infoChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); + dataChannelConsumerJob.join(); log.info("Joined the consumer of the ChatRoomChannel"); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 0aeefea4..caa9228a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index c87036c9..3eb90960 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName()); }); } -- 2.20.1