From bb766d0f862774f18f89f23756072b232623dde9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 20:35:51 +0200 Subject: [PATCH] WIP --- .../chat/backend/domain/ChatRoomInfo.java | 6 +++ .../implementation/kafka/InfoChannel.java | 16 +++++--- .../kafka/KafkaChatHomeService.java | 5 +-- .../kafka/KafkaServicesApplicationRunner.java | 38 ++++++++++++++++--- .../kafka/KafkaServicesConfiguration.java | 1 - .../mongodb/MongoDbStorageStrategy.java | 13 +------ 6 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java index 33c522d1..e91b28c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -19,4 +19,10 @@ public class ChatRoomInfo private final String name; @Getter private final Integer shard; + + + public ChatRoomInfo(UUID id, String name) + { + this(id, name, null); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 22c4668d..5a1d186d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -30,6 +30,7 @@ public class InfoChannel implements Runnable private final long[] startOffset; private final long[] currentOffset; private final Map chatRoomInfo; + private final DataChannel dataChannel; private boolean running; @@ -37,7 +38,8 @@ public class InfoChannel implements Runnable public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + DataChannel dataChannel) { log.debug( "Creating InfoChannel for topic {}", @@ -60,15 +62,16 @@ public class InfoChannel implements Runnable .entrySet() .stream() .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue()); + + this.dataChannel = dataChannel; } - Mono loadInProgress() + boolean loadInProgress() { - return Mono - .fromSupplier(() -> IntStream - .range(0, numShards) - .anyMatch(partition -> currentOffset[partition] < startOffset[partition])); + return IntStream + .range(0, numShards) + .anyMatch(partition -> currentOffset[partition] < startOffset[partition]); } Mono sendChatRoomCreatedEvent( @@ -176,6 +179,7 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); + this.dataChannel.createChatRoom(chatRoomInfo); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index d0b8eff7..9832519d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -40,10 +40,7 @@ public class KafkaChatHomeService implements ChatHomeService { return infoChannel .getChatRoomInfo(id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( - id, - shard, - dataChannel.getOwnedShards()))); + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index a35cee02..125227a0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -6,6 +6,7 @@ import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -30,19 +31,44 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner private final Consumer chatRoomChannelConsumer; private final WorkAssignor workAssignor; - CompletableFuture chatRoomChannelConsumerJob; + CompletableFuture infoChannelConsumerJob; + CompletableFuture dataChannelConsumerJob; @Override public void run(ApplicationArguments args) throws Exception { + String infoTopic = properties.getKafka().getInfoChannelTopic(); + List partitions = infoChannelConsumer + .partitionsFor(infoTopic) + .stream() + .map(partitionInfo -> new TopicPartition( + infoTopic, + partitionInfo.partition())) + .toList(); + infoChannelConsumer.assign(partitions); + log.info("Starting the consumer for the InfoChannel"); + infoChannelConsumerJob = taskExecutor + .submitCompletable(infoChannel) + .exceptionally(e -> + { + log.error("The consumer for the InfoChannel exited abnormally!", e); + return null; + }); + + while (infoChannel.loadInProgress()) + { + log.info("InfoChannel is still loading..."); + Thread.sleep(1000); + } + workAssignor.assignWork(chatRoomChannelConsumer); - log.info("Starting the consumer for the ChatRoomChannel"); - chatRoomChannelConsumerJob = taskExecutor + log.info("Starting the consumer for the DataChannel"); + dataChannelConsumerJob = taskExecutor .submitCompletable(dataChannel) .exceptionally(e -> { - log.error("The consumer for the ChatRoomChannel exited abnormally!", e); + log.error("The consumer for the DataChannel exited abnormally!", e); return null; }); } @@ -51,9 +77,9 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner public void joinChatRoomChannelConsumerJob() { log.info("Signaling the consumer of the CahtRoomChannel to quit its work"); - chatRoomChannelConsumer.wakeup(); + infoChannelConsumer.wakeup(); log.info("Waiting for the consumer of the ChatRoomChannel to finish its work"); - chatRoomChannelConsumerJob.join(); + dataChannelConsumerJob.join(); log.info("Joined the consumer of the ChatRoomChannel"); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index ed1ccddc..018dc653 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; -import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; diff --git a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java index c87036c9..3eb90960 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java @@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); - - log.info( - "{} - old shard: {}, new shard: {}", - chatRoomId, - chatRoomTo.getShard(), - shard); - - return new ChatRoomInfo( - chatRoomId, - chatRoomTo.getName(), - shard); + return new ChatRoomInfo(chatRoomId, chatRoomTo.getName()); }); } -- 2.20.1