refactor: Separated channels for data and info -- Refactored/aligned code
authorKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:33:59 +0000 (23:33 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Sep 2023 11:17:27 +0000 (13:17 +0200)
* Split `ChatRoomChannel` into `InfoChannel` and `DataChannel`
** `DataChannel` manages only data for chat-messages
** `InfoChannel` manages all info-data (at the moment only
   `EventChatRoomCreated`)
* Aligned test-setup for kafka-related tests

18 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java

index 9c80f5d..381c6c6 100644 (file)
@@ -36,7 +36,8 @@ public class ChatBackendProperties
   {
     private String clientIdPrefix;
     private String bootstrapServers = ":9092";
-    private String chatRoomChannelTopic = "message_channel";
+    private String infoChannelTopic = "info_channel";
+    private String dataChannelTopic = "data_channel";
     private int numPartitions = 2;
   }
 
index 33c522d..e91b28c 100644 (file)
@@ -19,4 +19,10 @@ public class ChatRoomInfo
   private final String name;
   @Getter
   private final Integer shard;
+
+
+  public ChatRoomInfo(UUID id, String name)
+  {
+    this(id, name, null);
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
new file mode 100644 (file)
index 0000000..233e8f6
--- /dev/null
@@ -0,0 +1,25 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ConsumerTaskRunner
+{
+  private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
+  private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+
+  public void run()
+  {
+    infoChannelConsumerTaskExecutor.executeConsumerTask();
+    dataChannelConsumerTaskExecutor.executeConsumerTask();
+  }
+
+  public void joinConsumerTasks()
+  {
+    dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+    infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
+  }
+}
index d94bc65..4d5a141 100644 (file)
@@ -4,8 +4,7 @@ import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -16,7 +15,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
@@ -25,7 +23,7 @@ import java.util.stream.IntStream;
 
 
 @Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
@@ -37,7 +35,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
 
   private boolean running;
@@ -45,21 +42,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private volatile boolean loadInProgress;
 
 
-  public ChatRoomChannel(
+  public DataChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
+    Consumer<String, AbstractMessageTo> dataChannelConsumer,
     ZoneId zoneId,
     int numShards,
     int bufferSize,
     Clock clock)
   {
     log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
+        "Creating DataChannel for topic {} with {} partitions",
         topic,
         numShards);
     this.topic = topic;
-    this.consumer = consumer;
+    this.consumer = dataChannelConsumer;
     this.producer = producer;
     this.zoneId = zoneId;
     this.numShards = numShards;
@@ -68,55 +65,17 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
     this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
         .forEach(shard ->
         {
-          this.chatRoomInfo[shard] = new HashMap<>();
           this.chatRoomData[shard] = new HashMap<>();
         });
   }
 
 
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
-      UUID chatRoomId,
-      String name)
-  {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              chatRoomId.toString(),
-              createChatRoomRequestTo);
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
-          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
-          createChatRoom(chatRoomInfo);
-          sink.success(chatRoomInfo);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
-              chatRoomId,
-              name,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
   Mono<Message> sendChatMessage(
       UUID chatRoomId,
       Message.MessageKey key,
@@ -211,12 +170,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
         {
-          loadChatRoom(records);
+          loadChatRoomData(records);
 
           if (isLoadingCompleted())
           {
@@ -244,7 +203,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoomData(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
@@ -252,13 +211,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
       switch (record.value().getType())
       {
-        case COMMAND_CREATE_CHATROOM:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
         case EVENT_CHATMESSAGE_RECEIVED:
           Instant instant = Instant.ofEpochSecond(record.timestamp());
           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
@@ -282,42 +234,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
-  {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
-
-  private void createChatRoom(ChatRoomInfo chatRoomInfo)
-  {
-    UUID id = chatRoomInfo.getId();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getName(),
-        chatRoomInfo.getShard(),
-        chatRoomData);
-  }
-
   private void loadChatMessage(
       UUID chatRoomId,
       LocalDateTime timestamp,
@@ -328,7 +244,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
     Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
 
-    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
+    ChatRoomData chatRoomData = this.chatRoomData[partition].computeIfAbsent(
+        chatRoomId,
+        (id) ->
+        {
+          log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+          KafkaChatMessageService service = new KafkaChatMessageService(this, id);
+          return new ChatRoomData(clock, service, bufferSize);
+        });
     KafkaChatMessageService kafkaChatRoomService =
         (KafkaChatMessageService) chatRoomData.getChatRoomService();
 
@@ -353,33 +276,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void putChatRoom(
-      UUID chatRoomId,
-      String name,
-      Integer partition,
-      ChatRoomData chatRoomData)
-  {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
-    {
-      log.warn(
-          "Ignoring existing chat-room for {}: {}",
-          partition,
-          chatRoomId);
-    }
-    else
-    {
-      log.info(
-          "Adding new chat-room to partition {}: {}",
-          partition,
-          chatRoomData);
-
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
-      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
-    }
-  }
-
   int[] getOwnedShards()
   {
     return IntStream
@@ -402,27 +298,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
     return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
-
-  Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
-  }
-
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
-  }
 }
index d94bc65..ad03f0d 100644 (file)
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.stream.IntStream;
 
 
 @Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
-  private final ZoneId zoneId;
   private final int numShards;
-  private final int bufferSize;
-  private final Clock clock;
-  private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
-  private final Map<UUID, ChatRoomData>[] chatRoomData;
+  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
 
   private boolean running;
-  @Getter
-  private volatile boolean loadInProgress;
 
 
-  public ChatRoomChannel(
+  public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
-    ZoneId zoneId,
-    int numShards,
-    int bufferSize,
-    Clock clock)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer)
   {
     log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
-        topic,
-        numShards);
+        "Creating InfoChannel for topic {}",
+        topic);
     this.topic = topic;
-    this.consumer = consumer;
+    this.consumer = infoChannelConsumer;
     this.producer = producer;
-    this.zoneId = zoneId;
-    this.numShards = numShards;
-    this.bufferSize = bufferSize;
-    this.clock = clock;
-    this.isShardOwned = new boolean[numShards];
+    this.chatRoomInfo = new HashMap<>();
+
+    this.numShards = consumer
+        .partitionsFor(topic)
+        .size();
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
-    this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard ->
-        {
-          this.chatRoomInfo[shard] = new HashMap<>();
-          this.chatRoomData[shard] = new HashMap<>();
-        });
+        .forEach(partition -> this.nextOffset[partition] = -1l);
   }
 
 
+  boolean loadInProgress()
+  {
+    return IntStream
+        .range(0, numShards)
+        .anyMatch(partition -> nextOffset[partition] < currentOffset[partition]);
+  }
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+  Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
       UUID chatRoomId,
-      String name)
+      String name,
+      int shard)
   {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+    EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
               topic,
-              chatRoomId.toString(),
-              createChatRoomRequestTo);
+              Integer.toString(shard),
+              to);
 
       producer.send(record, ((metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          log.info("Successfully sent chreate-request for chat room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
-          createChatRoom(chatRoomInfo);
           sink.success(chatRoomInfo);
         }
         else
@@ -117,122 +101,28 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     });
   }
 
-  Mono<Message> sendChatMessage(
-      UUID chatRoomId,
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              null,
-              zdt.toEpochSecond(),
-              chatRoomId.toString(),
-              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = new Message(key, metadata.offset(), timestamp, text);
-          log.info("Successfully send message {}", message);
-          sink.success(message);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
-              chatRoomId,
-              key,
-              timestamp,
-              text,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
-
-    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] =  true;
-      this.currentOffset[partition] = currentOffset;
-
-      log.info(
-          "Partition assigned: {} - loading messages: next={} -> current={}",
-          partition,
-          nextOffset[partition],
-          currentOffset);
-
-      consumer.seek(topicPartition, nextOffset[partition]);
-    });
-
-    consumer.resume(partitions);
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(topicPartition ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] = false;
-      log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
-    });
-  }
-
-  @Override
-  public void onPartitionsLost(Collection<TopicPartition> partitions)
-  {
-    log.warn("Lost partitions: {}, partitions");
-    // TODO: Muss auf den Verlust anders reagiert werden?
-    onPartitionsRevoked(partitions);
-  }
 
   @Override
   public void run()
   {
     running = true;
 
+    consumer
+        .endOffsets(consumer.assignment())
+        .entrySet()
+        .stream()
+        .forEach(entry -> this.currentOffset[entry.getKey().partition()] = entry.getValue());
+    IntStream
+        .range(0, numShards)
+        .forEach(partition -> this.nextOffset[partition] = 0l);
+
     while (running)
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
-        log.info("Fetched {} messages", records.count());
-
-        if (loadInProgress)
-        {
-          loadChatRoom(records);
-
-          if (isLoadingCompleted())
-          {
-            log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
-            loadInProgress = false;
-          }
-        }
-        else
-        {
-          if (!records.isEmpty())
-          {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
-          }
-        }
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        log.debug("Fetched {} messages", records.count());
+        handleMessages(records);
       }
       catch (WakeupException e)
       {
@@ -244,36 +134,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
-      UUID chatRoomId = UUID.fromString(record.key());
-
       switch (record.value().getType())
       {
-        case COMMAND_CREATE_CHATROOM:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
-        case EVENT_CHATMESSAGE_RECEIVED:
-          Instant instant = Instant.ofEpochSecond(record.timestamp());
-          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-          loadChatMessage(
-              chatRoomId,
-              timestamp,
-              record.offset(),
-              (EventChatMessageReceivedTo) record.value(),
-              record.partition());
+        case EVENT_CHATROOM_CREATED:
+          EventChatRoomCreated eventChatRoomCreated =
+              (EventChatRoomCreated) record.value();
+          createChatRoom(eventChatRoomCreated.toChatRoomInfo());
           break;
 
         default:
           log.debug(
-              "Ignoring message for chat-room {} with offset {}: {}",
-              chatRoomId,
+              "Ignoring message for key={} with offset={}: {}",
+              record.key(),
               record.offset(),
               record.value());
       }
@@ -282,84 +158,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
-  {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
-
   private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
-    UUID id = chatRoomInfo.getId();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getName(),
-        chatRoomInfo.getShard(),
-        chatRoomData);
-  }
-
-  private void loadChatMessage(
-      UUID chatRoomId,
-      LocalDateTime timestamp,
-      long offset,
-      EventChatMessageReceivedTo chatMessageTo,
-      int partition)
-  {
-    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
-    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
-
-    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
-    KafkaChatMessageService kafkaChatRoomService =
-        (KafkaChatMessageService) chatRoomData.getChatRoomService();
-
-    kafkaChatRoomService.persistMessage(message);
-  }
-
-  private boolean isLoadingCompleted()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
-  }
-
-  private void pauseAllOwnedPartions()
-  {
-    consumer.pause(IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(topic, shard))
-        .toList());
-  }
+    UUID chatRoomId = chatRoomInfo.getId();
+    Integer partition = chatRoomInfo.getShard();
 
-
-  private void putChatRoom(
-      UUID chatRoomId,
-      String name,
-      Integer partition,
-      ChatRoomData chatRoomData)
-  {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+    if (this.chatRoomInfo.containsKey(chatRoomId))
     {
       log.warn(
           "Ignoring existing chat-room for {}: {}",
@@ -369,60 +173,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     else
     {
       log.info(
-          "Adding new chat-room to partition {}: {}",
+          "Adding new chat-room for partition {}: {}",
           partition,
-          chatRoomData);
-
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
-      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
-    }
-  }
-
-  int[] getOwnedShards()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .toArray();
-  }
-
-  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
+          chatRoomId);
 
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
+      this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
     }
-
-    return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
 
   Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+    return Flux.fromIterable(chatRoomInfo.values());
   }
 
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+  Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
+    return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
 }
index 5019ed2..9832519 100644 (file)
@@ -18,45 +18,47 @@ import java.util.*;
 public class KafkaChatHomeService implements ChatHomeService
 {
   private final int numPartitions;
-  private final ChatRoomChannel chatRoomChannel;
+  private final InfoChannel infoChannel;
+  private final DataChannel dataChannel;
 
 
 
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+    int shard = selectShard(id);
+    log.info(
+        "Sending create-command for chat rooom: id={}, name={}, shard={}",
+        id,
+        name,
+        shard);
+    return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
   }
 
   @Override
   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomInfo(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
+    return infoChannel
+        .getChatRoomInfo(id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
   public Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return chatRoomChannel.getChatRoomInfo();
+    return infoChannel.getChatRoomInfo();
   }
 
   @Override
   public Mono<ChatRoomData> getChatRoomData(UUID id)
   {
     int shard = selectShard(id);
-    return chatRoomChannel
+    return dataChannel
         .getChatRoomData(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
-            chatRoomChannel.getOwnedShards())));
+            dataChannel.getOwnedShards())));
   }
 
   int selectShard(UUID chatRoomId)
index df9ee73..8ab50f1 100644 (file)
@@ -15,7 +15,7 @@ import java.util.UUID;
 @Slf4j
 public class KafkaChatMessageService implements ChatMessageService
 {
-  private final ChatRoomChannel chatRoomChannel;
+  private final DataChannel dataChannel;
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
@@ -27,7 +27,7 @@ public class KafkaChatMessageService implements ChatMessageService
     LocalDateTime timestamp,
     String text)
   {
-    return chatRoomChannel
+    return dataChannel
         .sendChatMessage(chatRoomId, key, timestamp, text)
         .doOnSuccess(message -> persistMessage(message));
   }
index da7ee75..d0151ca 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
+import jakarta.annotation.PreDestroy;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.boot.ApplicationArguments;
@@ -17,12 +18,18 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  private final ConsumerTaskExecutor chatRoomChannelTaskExecutor;
+  private final ConsumerTaskRunner consumerTaskRunner;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
-    chatRoomChannelTaskExecutor.executeConsumerTask();
+    consumerTaskRunner.run();
+  }
+
+  @PreDestroy
+  public void joinConsumerTasks()
+  {
+    consumerTaskRunner.joinConsumerTasks();
   }
 }
index cda0b94..5bde07c 100644 (file)
@@ -3,14 +3,15 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -36,54 +37,109 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ConsumerTaskExecutor chatRoomChannelTaskExecutor(
+  ConsumerTaskRunner consumerTaskRunner(
+      ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
+      ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+  {
+    return new ConsumerTaskRunner(
+        infoChannelConsumerTaskExecutor,
+        dataChannelConsumerTaskExecutor);
+  }
+
+  @Bean
+  ConsumerTaskExecutor infoChannelConsumerTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
-      ChatRoomChannel chatRoomChannel,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
-      ConsumerTaskExecutor.WorkAssignor workAssignor)
+      InfoChannel infoChannel,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
   {
     return new ConsumerTaskExecutor(
         taskExecutor,
-        chatRoomChannel,
-        chatRoomChannelConsumer,
-        workAssignor);
+        infoChannel,
+        infoChannelConsumer,
+        infoChannelWorkAssignor);
+  }
+
+  @Bean
+  ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
+      ChatBackendProperties properties)
+  {
+    return consumer ->
+    {
+      String topic = properties.getKafka().getInfoChannelTopic();
+      List<TopicPartition> partitions = consumer
+          .partitionsFor(topic)
+          .stream()
+          .map(partitionInfo ->
+              new TopicPartition(topic, partitionInfo.partition()))
+          .toList();
+      consumer.assign(partitions);
+    };
   }
 
   @Bean
-  ConsumerTaskExecutor.WorkAssignor workAssignor(
+  ConsumerTaskExecutor dataChannelConsumerTaskExecutor(
+      ThreadPoolTaskExecutor taskExecutor,
+      DataChannel dataChannel,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
+      ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
+  {
+    return new ConsumerTaskExecutor(
+        taskExecutor,
+        dataChannel,
+        dataChannelConsumer,
+        dataChannelWorkAssignor);
+  }
+
+  @Bean
+  ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
       ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
+      DataChannel dataChannel)
   {
     return consumer ->
     {
       List<String> topics =
-          List.of(properties.getKafka().getChatRoomChannelTopic());
-      consumer.subscribe(topics, chatRoomChannel);
+          List.of(properties.getKafka().getDataChannelTopic());
+      consumer.subscribe(topics, dataChannel);
     };
   }
 
   @Bean
     ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
+      InfoChannel infoChannel,
+      DataChannel dataChannel)
   {
     return new KafkaChatHomeService(
         properties.getKafka().getNumPartitions(),
-        chatRoomChannel);
+        infoChannel,
+        dataChannel);
   }
 
   @Bean
-  ChatRoomChannel chatRoomChannel(
+  InfoChannel infoChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer)
+  {
+    return new InfoChannel(
+        properties.getKafka().getInfoChannelTopic(),
+        producer,
+        infoChannelConsumer);
+  }
+
+  @Bean
+  DataChannel dataChannel(
+      ChatBackendProperties properties,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatRoomChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
+    return new DataChannel(
+        properties.getKafka().getDataChannelTopic(),
+        producer,
+        dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
@@ -91,7 +147,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+  Producer<String, AbstractMessageTo>  producer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
@@ -101,7 +157,7 @@ public class KafkaServicesConfiguration
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
     return new KafkaProducer<>(
         properties,
         stringSerializer,
@@ -126,7 +182,28 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
+  Consumer<String, AbstractMessageTo>  infoChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<AbstractMessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "info_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  Consumer<String, AbstractMessageTo>  dataChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
@@ -136,10 +213,10 @@ public class KafkaServicesConfiguration
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
-        "chatroom_channel");
+        "data_channel");
     return new KafkaConsumer<>(
         properties,
         stringDeserializer,
@@ -168,7 +245,7 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }
 
index 17d3a39..d4f6508 100644 (file)
@@ -1,5 +1,6 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.*;
 
 
index 0cf4232..ae5a501 100644 (file)
@@ -1,2 +1,45 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated {
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.UUID;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatRoomCreated extends AbstractMessageTo
+{
+  private String id;
+  private String name;
+  private Integer shard;
+
+
+  public EventChatRoomCreated()
+  {
+    super(ToType.EVENT_CHATROOM_CREATED);
+  }
+
+
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(UUID.fromString(id), name, shard);
+  }
+
+  public static EventChatRoomCreated of(UUID id, String name, Integer shard)
+  {
+    EventChatRoomCreated event = new EventChatRoomCreated();
+
+    event.setId(id.toString());
+    event.setName(name);
+    event.setShard(shard);
+
+    return event;
+  }
 }
index 8ea9cc2..853ee1c 100644 (file)
@@ -11,20 +11,18 @@ import org.springframework.data.mongodb.core.mapping.Document;
 @Getter(AccessLevel.PACKAGE)
 @Setter(AccessLevel.PACKAGE)
 @EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
+@ToString(of = { "id", "name" })
 @Document
 public class ChatRoomTo
 {
   @Id
   private String id;
-  private Integer shard;
   private String name;
 
   public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
         chatRoomInfo.getId().toString(),
-        chatRoomInfo.getShard(),
         chatRoomInfo.getName());
   }
 }
index c87036c..3eb9096 100644 (file)
@@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
-
-          log.info(
-              "{} - old shard: {}, new shard:  {}",
-              chatRoomId,
-              chatRoomTo.getShard(),
-              shard);
-
-          return new ChatRoomInfo(
-              chatRoomId,
-              chatRoomTo.getName(),
-              shard);
+          return new ChatRoomInfo(chatRoomId, chatRoomTo.getName());
         });
   }
 
index f257d5e..18f6c54 100644 (file)
@@ -1,44 +1,48 @@
 package de.juplo.kafka.chat.backend;
 
-import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel;
-import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner;
+import de.juplo.kafka.chat.backend.implementation.kafka.*;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
 
 
 @SpringBootTest(
     webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
     properties = {
+        "spring.main.allow-bean-definition-overriding=true",
         "chat.backend.services=kafka",
         "chat.backend.kafka.client-id-PREFIX=TEST",
         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+        "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+        "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
         "chat.backend.kafka.num-partitions=10",
         })
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+    topics = { INFO_TOPIC, DATA_TOPIC },
+    partitions = 10)
 @Slf4j
 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 {
-  final static String TOPIC = "KAFKA_CONFIGURATION_IT";
-
-  static CompletableFuture<Void> CONSUMER_JOB;
+  final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
+  final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
 
   @MockBean
   KafkaServicesApplicationRunner kafkaServicesApplicationRunner;
@@ -46,31 +50,25 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
   @BeforeAll
   public static void sendAndLoadStoredData(
       @Autowired KafkaTemplate<String, String> messageTemplate,
-      @Autowired Consumer chatRoomChannelConsumer,
-      @Autowired ThreadPoolTaskExecutor taskExecutor,
-      @Autowired ChatRoomChannel chatRoomChannel)
+      @Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
-    chatRoomChannelConsumer.assign(assignedPartitions);
-    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
-    CONSUMER_JOB = taskExecutor
-        .submitCompletable(chatRoomChannel)
-        .exceptionally(e ->
-        {
-          log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
-          return null;
-        });
+    consumerTaskRunner.run();
   }
 
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  static void send(
+      KafkaTemplate<String, String> kafkaTemplate,
+      String topic,
+      String key,
+      String value,
+      String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(
@@ -79,14 +77,29 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
         value,
         new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
   }
-
   @AfterAll
-  static void joinConsumerJob(@Autowired Consumer chatRoomChannelConsumer)
+  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+  {
+    consumerTaskRunner.joinConsumerTasks();
+  }
+
+
+  @TestConfiguration
+  @EnableConfigurationProperties(ChatBackendProperties.class)
+  @Import(KafkaServicesConfiguration.class)
+  static class KafkaConfigurationITConfiguration
   {
-    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
-    chatRoomChannelConsumer.wakeup();
-    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
-    CONSUMER_JOB.join();
-    log.info("Joined the consumer of the ChatRoomChannel");
+    @Bean
+    ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
+        DataChannel dataChannel)
+    {
+      return consumer ->
+      {
+        List<TopicPartition> assignedPartitions =
+            List.of(new TopicPartition(DATA_TOPIC, 2));
+        consumer.assign(assignedPartitions);
+        dataChannel.onPartitionsAssigned(assignedPartitions);
+      };
+    }
   }
 }
index 4aa362d..a614037 100644 (file)
@@ -23,7 +23,8 @@ import java.time.Clock;
 import java.util.List;
 
 import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
 
 
 @SpringBootTest(
@@ -38,14 +39,18 @@ import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServ
     "chat.backend.kafka.client-id-PREFIX=TEST",
     "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
     "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+    "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+    "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
     "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
 })
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+    topics = { INFO_TOPIC, DATA_TOPIC },
+    partitions = 10)
 @Slf4j
 public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 {
-  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+  final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
+  final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
 
 
   @TestConfiguration
@@ -54,15 +59,15 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   static class KafkaChatHomeTestConfiguration
   {
     @Bean
-    ConsumerTaskExecutor.WorkAssignor workAssignor(
-        ChatRoomChannel chatRoomChannel)
+    ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
+        DataChannel dataChannel)
     {
       return consumer ->
       {
         List<TopicPartition> assignedPartitions =
-            List.of(new TopicPartition(TOPIC, 2));
+            List.of(new TopicPartition(DATA_TOPIC, 2));
         consumer.assign(assignedPartitions);
-        chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+        dataChannel.onPartitionsAssigned(assignedPartitions);
       };
     }
 
@@ -76,21 +81,26 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
   @BeforeAll
   public static void sendAndLoadStoredData(
-      @Autowired ConsumerTaskExecutor consumerTaskExecutor,
-      @Autowired KafkaTemplate<String, String> messageTemplate)
+      @Autowired ConsumerTaskRunner consumerTaskRunner,
+      @Autowired KafkaTemplate<String, String> messageTemplate) throws InterruptedException
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    consumerTaskExecutor.executeConsumerTask();
+    consumerTaskRunner.run();
   }
 
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  static void send(
+      KafkaTemplate<String, String> kafkaTemplate,
+      String topic,
+      String key,
+      String value,
+      String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(
@@ -101,8 +111,8 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   }
 
   @AfterAll
-  static void joinConsumerJob(@Autowired ConsumerTaskExecutor consumerTaskExecutor)
+  static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
   {
-    consumerTaskExecutor.joinConsumerTaskJob();
+    consumerTaskRunner.joinConsumerTasks();
   }
 }
index d9d5a08..72ade06 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;