From 2ea82edbf9b71dc1fa8ccaa56dfa56982ae6e5c3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 01:31:38 +0200 Subject: [PATCH] FIX --- .../kafka/ConsumerTaskRunner.java | 11 +--- .../implementation/kafka/DataChannel.java | 52 +++---------------- .../implementation/kafka/InfoChannel.java | 7 +-- .../kafka/KafkaServicesConfiguration.java | 12 ++--- 4 files changed, 14 insertions(+), 68 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java index b505ffb7..0f433007 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java @@ -11,18 +11,9 @@ public class ConsumerTaskRunner private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor; private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor; - private final InfoChannel infoChannel; - - void run() throws InterruptedException + void run() { infoChannelConsumerTaskExecutor.executeConsumerTask(); - - while (infoChannel.loadInProgress()) - { - log.info("InfoChannel is still loading..."); - Thread.sleep(1000); - } - dataChannelConsumerTaskExecutor.executeConsumerTask(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index f20f7c23..4d5a1412 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -234,25 +234,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener } } - void createChatRoom(ChatRoomInfo chatRoomInfo) - { - if (!isShardOwned[chatRoomInfo.getShard()]) - { - log.debug("Ignoring not owned chat-room {}", chatRoomInfo); - return; - } - - UUID id = chatRoomInfo.getId(); - log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); - KafkaChatMessageService service = new KafkaChatMessageService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); - putChatRoom( - chatRoomInfo.getId(), - chatRoomInfo.getName(), - chatRoomInfo.getShard(), - chatRoomData); - } - private void loadChatMessage( UUID chatRoomId, LocalDateTime timestamp, @@ -263,7 +244,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); + ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent( + chatRoomId, + (id) -> + { + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatMessageService service = new KafkaChatMessageService(this, id); + return new ChatRoomData(clock, service, bufferSize); + }); KafkaChatMessageService kafkaChatRoomService = (KafkaChatMessageService) chatRoomData.getChatRoomService(); @@ -288,30 +276,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener } - private void putChatRoom( - UUID chatRoomId, - String name, - Integer partition, - ChatRoomData chatRoomData) - { - if (this.chatRoomData[partition].containsKey(chatRoomId)) - { - log.warn( - "Ignoring existing chat-room for {}: {}", - partition, - chatRoomId); - } - else - { - log.info( - "Adding new chat-room to partition {}: {}", - partition, - chatRoomData); - - this.chatRoomData[partition].put(chatRoomId, chatRoomData); - } - } - int[] getOwnedShards() { return IntStream diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index c7cc6ad5..7d12bca5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -30,7 +30,6 @@ public class InfoChannel implements Runnable private final long[] startOffset; private final long[] nextOffset; private final Map chatRoomInfo; - private final DataChannel dataChannel; private boolean running; @@ -38,8 +37,7 @@ public class InfoChannel implements Runnable public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer, - DataChannel dataChannel) + Consumer infoChannelConsumer) { log.debug( "Creating InfoChannel for topic {}", @@ -57,8 +55,6 @@ public class InfoChannel implements Runnable IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = -1l); - - this.dataChannel = dataChannel; } @@ -182,7 +178,6 @@ public class InfoChannel implements Runnable chatRoomId); this.chatRoomInfo.put(chatRoomId, chatRoomInfo); - this.dataChannel.createChatRoom(chatRoomInfo); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 64cf455e..1f9594d7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -39,13 +39,11 @@ public class KafkaServicesConfiguration @Bean ConsumerTaskRunner consumerTaskRunner( ConsumerTaskExecutor infoChannelConsumerTaskExecutor, - ConsumerTaskExecutor dataChannelConsumerTaskExecutor, - InfoChannel infoChannel) + ConsumerTaskExecutor dataChannelConsumerTaskExecutor) { return new ConsumerTaskRunner( infoChannelConsumerTaskExecutor, - dataChannelConsumerTaskExecutor, - infoChannel); + dataChannelConsumerTaskExecutor); } @Bean @@ -122,14 +120,12 @@ public class KafkaServicesConfiguration InfoChannel infoChannel( ChatBackendProperties properties, Producer producer, - Consumer infoChannelConsumer, - DataChannel dataChannel) + Consumer infoChannelConsumer) { return new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, - infoChannelConsumer, - dataChannel); + infoChannelConsumer); } @Bean -- 2.20.1