FIX
authorKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 23:31:38 +0000 (01:31 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 23:31:38 +0000 (01:31 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index b505ffb..0f43300 100644 (file)
@@ -11,18 +11,9 @@ public class ConsumerTaskRunner
   private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
   private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
 
-  private final InfoChannel infoChannel;
-
-  void run() throws InterruptedException
+  void run()
   {
     infoChannelConsumerTaskExecutor.executeConsumerTask();
-
-    while (infoChannel.loadInProgress())
-    {
-      log.info("InfoChannel is still loading...");
-      Thread.sleep(1000);
-    }
-
     dataChannelConsumerTaskExecutor.executeConsumerTask();
   }
 
index f20f7c2..4d5a141 100644 (file)
@@ -234,25 +234,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  void createChatRoom(ChatRoomInfo chatRoomInfo)
-  {
-    if (!isShardOwned[chatRoomInfo.getShard()])
-    {
-      log.debug("Ignoring not owned chat-room {}", chatRoomInfo);
-      return;
-    }
-
-    UUID id = chatRoomInfo.getId();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getName(),
-        chatRoomInfo.getShard(),
-        chatRoomData);
-  }
-
   private void loadChatMessage(
       UUID chatRoomId,
       LocalDateTime timestamp,
@@ -263,7 +244,14 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
+    ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent(
+        chatRoomId,
+        (id) ->
+        {
+          log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+          KafkaChatMessageService service = new KafkaChatMessageService(this, id);
+          return new ChatRoomData(clock, service, bufferSize);
+        });
     KafkaChatMessageService kafkaChatRoomService =
         (KafkaChatMessageService) chatRoomData.getChatRoomService();
 
@@ -288,30 +276,6 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void putChatRoom(
-      UUID chatRoomId,
-      String name,
-      Integer partition,
-      ChatRoomData chatRoomData)
-  {
-    if (this.chatRoomData[partition].containsKey(chatRoomId))
-    {
-      log.warn(
-          "Ignoring existing chat-room for {}: {}",
-          partition,
-          chatRoomId);
-    }
-    else
-    {
-      log.info(
-          "Adding new chat-room to partition {}: {}",
-          partition,
-          chatRoomData);
-
-      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
-    }
-  }
-
   int[] getOwnedShards()
   {
     return IntStream
index c7cc6ad..7d12bca 100644 (file)
@@ -30,7 +30,6 @@ public class InfoChannel implements Runnable
   private final long[] startOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
-  private final DataChannel dataChannel;
 
   private boolean running;
 
@@ -38,8 +37,7 @@ public class InfoChannel implements Runnable
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> infoChannelConsumer,
-    DataChannel dataChannel)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -57,8 +55,6 @@ public class InfoChannel implements Runnable
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = -1l);
-
-    this.dataChannel = dataChannel;
   }
 
 
@@ -182,7 +178,6 @@ public class InfoChannel implements Runnable
           chatRoomId);
 
       this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
-      this.dataChannel.createChatRoom(chatRoomInfo);
     }
   }
 
index 64cf455..1f9594d 100644 (file)
@@ -39,13 +39,11 @@ public class KafkaServicesConfiguration
   @Bean
   ConsumerTaskRunner consumerTaskRunner(
       ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
-      ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
-      InfoChannel infoChannel)
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
   {
     return new ConsumerTaskRunner(
         infoChannelConsumerTaskExecutor,
-        dataChannelConsumerTaskExecutor,
-        infoChannel);
+        dataChannelConsumerTaskExecutor);
   }
 
   @Bean
@@ -122,14 +120,12 @@ public class KafkaServicesConfiguration
   InfoChannel infoChannel(
       ChatBackendProperties properties,
       Producer<String, AbstractMessageTo> producer,
-      Consumer<String, AbstractMessageTo> infoChannelConsumer,
-      DataChannel dataChannel)
+      Consumer<String, AbstractMessageTo> infoChannelConsumer)
   {
     return new InfoChannel(
         properties.getKafka().getInfoChannelTopic(),
         producer,
-        infoChannelConsumer,
-        dataChannel);
+        infoChannelConsumer);
   }
 
   @Bean