WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:33:59 +0000 (23:33 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 22:28:40 +0000 (00:28 +0200)
16 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatMessageService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/ChatRoomTo.java
src/main/java/de/juplo/kafka/chat/backend/storage/mongodb/MongoDbStorageStrategy.java
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/data/EventChatMessageReceivedToTest.java

index 9c80f5d..381c6c6 100644 (file)
@@ -36,7 +36,8 @@ public class ChatBackendProperties
   {
     private String clientIdPrefix;
     private String bootstrapServers = ":9092";
-    private String chatRoomChannelTopic = "message_channel";
+    private String infoChannelTopic = "info_channel";
+    private String dataChannelTopic = "data_channel";
     private int numPartitions = 2;
   }
 
index 33c522d..e91b28c 100644 (file)
@@ -19,4 +19,10 @@ public class ChatRoomInfo
   private final String name;
   @Getter
   private final Integer shard;
+
+
+  public ChatRoomInfo(UUID id, String name)
+  {
+    this(id, name, null);
+  }
 }
index d94bc65..f20f7c2 100644 (file)
@@ -4,8 +4,7 @@ import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -16,7 +15,6 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
-import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
@@ -25,7 +23,7 @@ import java.util.stream.IntStream;
 
 
 @Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class DataChannel implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
@@ -37,7 +35,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private final boolean[] isShardOwned;
   private final long[] currentOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
 
   private boolean running;
@@ -45,21 +42,21 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
   private volatile boolean loadInProgress;
 
 
-  public ChatRoomChannel(
+  public DataChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
+    Consumer<String, AbstractMessageTo> dataChannelConsumer,
     ZoneId zoneId,
     int numShards,
     int bufferSize,
     Clock clock)
   {
     log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
+        "Creating DataChannel for topic {} with {} partitions",
         topic,
         numShards);
     this.topic = topic;
-    this.consumer = consumer;
+    this.consumer = dataChannelConsumer;
     this.producer = producer;
     this.zoneId = zoneId;
     this.numShards = numShards;
@@ -68,55 +65,17 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     this.isShardOwned = new boolean[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
     this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
         .forEach(shard ->
         {
-          this.chatRoomInfo[shard] = new HashMap<>();
           this.chatRoomData[shard] = new HashMap<>();
         });
   }
 
 
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
-      UUID chatRoomId,
-      String name)
-  {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              chatRoomId.toString(),
-              createChatRoomRequestTo);
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
-          ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
-          createChatRoom(chatRoomInfo);
-          sink.success(chatRoomInfo);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
-              chatRoomId,
-              name,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
   Mono<Message> sendChatMessage(
       UUID chatRoomId,
       Message.MessageKey key,
@@ -211,12 +170,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
         log.info("Fetched {} messages", records.count());
 
         if (loadInProgress)
         {
-          loadChatRoom(records);
+          loadChatRoomData(records);
 
           if (isLoadingCompleted())
           {
@@ -244,7 +203,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void loadChatRoomData(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
@@ -252,13 +211,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
       switch (record.value().getType())
       {
-        case COMMAND_CREATE_CHATROOM:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
         case EVENT_CHATMESSAGE_RECEIVED:
           Instant instant = Instant.ofEpochSecond(record.timestamp());
           LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
@@ -282,31 +234,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
+  void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
+    if (!isShardOwned[chatRoomInfo.getShard()])
+    {
+      log.debug("Ignoring not owned chat-room {}", chatRoomInfo);
+      return;
+    }
 
-  private void createChatRoom(ChatRoomInfo chatRoomInfo)
-  {
     UUID id = chatRoomInfo.getId();
     log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
     KafkaChatMessageService service = new KafkaChatMessageService(this, id);
@@ -359,7 +294,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
       Integer partition,
       ChatRoomData chatRoomData)
   {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+    if (this.chatRoomData[partition].containsKey(chatRoomId))
     {
       log.warn(
           "Ignoring existing chat-room for {}: {}",
@@ -373,9 +308,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
           partition,
           chatRoomData);
 
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
       this.chatRoomData[partition].put(chatRoomId, chatRoomData);
     }
   }
@@ -402,27 +334,4 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
 
     return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
-
-  Flux<ChatRoomInfo> getChatRoomInfo()
-  {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
-  }
-
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
-  }
 }
index d94bc65..c7cc6ad 100644 (file)
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.domain.*;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
-import lombok.Getter;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.time.*;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 import java.util.stream.IntStream;
 
 
 @Slf4j
-public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
+public class InfoChannel implements Runnable
 {
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
-  private final ZoneId zoneId;
   private final int numShards;
-  private final int bufferSize;
-  private final Clock clock;
-  private final boolean[] isShardOwned;
-  private final long[] currentOffset;
+  private final long[] startOffset;
   private final long[] nextOffset;
-  private final Map<UUID, ChatRoomInfo>[] chatRoomInfo;
-  private final Map<UUID, ChatRoomData>[] chatRoomData;
+  private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final DataChannel dataChannel;
 
   private boolean running;
-  @Getter
-  private volatile boolean loadInProgress;
 
 
-  public ChatRoomChannel(
+  public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> consumer,
-    ZoneId zoneId,
-    int numShards,
-    int bufferSize,
-    Clock clock)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    DataChannel dataChannel)
   {
     log.debug(
-        "Creating ChatRoomChannel for topic {} with {} partitions",
-        topic,
-        numShards);
+        "Creating InfoChannel for topic {}",
+        topic);
     this.topic = topic;
-    this.consumer = consumer;
+    this.consumer = infoChannelConsumer;
     this.producer = producer;
-    this.zoneId = zoneId;
-    this.numShards = numShards;
-    this.bufferSize = bufferSize;
-    this.clock = clock;
-    this.isShardOwned = new boolean[numShards];
-    this.currentOffset = new long[numShards];
+    this.chatRoomInfo = new HashMap<>();
+
+    this.numShards = consumer
+        .partitionsFor(topic)
+        .size();
+    this.startOffset = new long[numShards];
     this.nextOffset = new long[numShards];
-    this.chatRoomInfo = new Map[numShards];
-    this.chatRoomData = new Map[numShards];
     IntStream
         .range(0, numShards)
-        .forEach(shard ->
-        {
-          this.chatRoomInfo[shard] = new HashMap<>();
-          this.chatRoomData[shard] = new HashMap<>();
-        });
+        .forEach(partition -> this.nextOffset[partition] = -1l);
+
+    this.dataChannel = dataChannel;
   }
 
 
+  boolean loadInProgress()
+  {
+    return IntStream
+        .range(0, numShards)
+        .anyMatch(partition -> nextOffset[partition] < startOffset[partition]);
+  }
 
-  Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+  Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
       UUID chatRoomId,
-      String name)
+      String name,
+      int shard)
   {
-    CommandCreateChatRoomTo createChatRoomRequestTo = CommandCreateChatRoomTo.of(name);
+    EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
               topic,
-              chatRoomId.toString(),
-              createChatRoomRequestTo);
+              Integer.toString(shard),
+              to);
 
       producer.send(record, ((metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+          log.info("Successfully sent chreate-request for chat room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
-          createChatRoom(chatRoomInfo);
           sink.success(chatRoomInfo);
         }
         else
@@ -117,122 +105,28 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     });
   }
 
-  Mono<Message> sendChatMessage(
-      UUID chatRoomId,
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
-    return Mono.create(sink ->
-    {
-      ProducerRecord<String, AbstractMessageTo> record =
-          new ProducerRecord<>(
-              topic,
-              null,
-              zdt.toEpochSecond(),
-              chatRoomId.toString(),
-              EventChatMessageReceivedTo.of(key.getUsername(), key.getMessageId(), text));
-
-      producer.send(record, ((metadata, exception) ->
-      {
-        if (metadata != null)
-        {
-          // On successful send
-          Message message = new Message(key, metadata.offset(), timestamp, text);
-          log.info("Successfully send message {}", message);
-          sink.success(message);
-        }
-        else
-        {
-          // On send-failure
-          log.error(
-              "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}",
-              chatRoomId,
-              key,
-              timestamp,
-              text,
-              exception);
-          sink.error(exception);
-        }
-      }));
-    });
-  }
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    log.info("Newly assigned partitions! Pausing normal operations...");
-    loadInProgress = true;
-
-    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] =  true;
-      this.currentOffset[partition] = currentOffset;
-
-      log.info(
-          "Partition assigned: {} - loading messages: next={} -> current={}",
-          partition,
-          nextOffset[partition],
-          currentOffset);
-
-      consumer.seek(topicPartition, nextOffset[partition]);
-    });
-
-    consumer.resume(partitions);
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(topicPartition ->
-    {
-      int partition = topicPartition.partition();
-      isShardOwned[partition] = false;
-      log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
-    });
-  }
-
-  @Override
-  public void onPartitionsLost(Collection<TopicPartition> partitions)
-  {
-    log.warn("Lost partitions: {}, partitions");
-    // TODO: Muss auf den Verlust anders reagiert werden?
-    onPartitionsRevoked(partitions);
-  }
 
   @Override
   public void run()
   {
     running = true;
 
+    consumer
+        .endOffsets(consumer.assignment())
+        .entrySet()
+        .stream()
+        .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
+    IntStream
+        .range(0, numShards)
+        .forEach(partition -> this.nextOffset[partition] = 0l);
+
     while (running)
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
-        log.info("Fetched {} messages", records.count());
-
-        if (loadInProgress)
-        {
-          loadChatRoom(records);
-
-          if (isLoadingCompleted())
-          {
-            log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
-            loadInProgress = false;
-          }
-        }
-        else
-        {
-          if (!records.isEmpty())
-          {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
-          }
-        }
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        log.debug("Fetched {} messages", records.count());
+        handleMessages(records);
       }
       catch (WakeupException e)
       {
@@ -244,36 +138,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
-      UUID chatRoomId = UUID.fromString(record.key());
-
       switch (record.value().getType())
       {
-        case COMMAND_CREATE_CHATROOM:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
-        case EVENT_CHATMESSAGE_RECEIVED:
-          Instant instant = Instant.ofEpochSecond(record.timestamp());
-          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-          loadChatMessage(
-              chatRoomId,
-              timestamp,
-              record.offset(),
-              (EventChatMessageReceivedTo) record.value(),
-              record.partition());
+        case EVENT_CHATROOM_CREATED:
+          EventChatRoomCreated eventChatRoomCreated =
+              (EventChatRoomCreated) record.value();
+          createChatRoom(eventChatRoomCreated.toChatRoomInfo());
           break;
 
         default:
           log.debug(
-              "Ignoring message for chat-room {} with offset {}: {}",
-              chatRoomId,
+              "Ignoring message for key={} with offset={}: {}",
+              record.key(),
               record.offset(),
               record.value());
       }
@@ -282,84 +162,12 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
-  {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
-
   private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
-    UUID id = chatRoomInfo.getId();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getName(),
-        chatRoomInfo.getShard(),
-        chatRoomData);
-  }
-
-  private void loadChatMessage(
-      UUID chatRoomId,
-      LocalDateTime timestamp,
-      long offset,
-      EventChatMessageReceivedTo chatMessageTo,
-      int partition)
-  {
-    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
-    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
-
-    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
-    KafkaChatMessageService kafkaChatRoomService =
-        (KafkaChatMessageService) chatRoomData.getChatRoomService();
-
-    kafkaChatRoomService.persistMessage(message);
-  }
-
-  private boolean isLoadingCompleted()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
-  }
-
-  private void pauseAllOwnedPartions()
-  {
-    consumer.pause(IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(topic, shard))
-        .toList());
-  }
+    UUID chatRoomId = chatRoomInfo.getId();
+    Integer partition = chatRoomInfo.getShard();
 
-
-  private void putChatRoom(
-      UUID chatRoomId,
-      String name,
-      Integer partition,
-      ChatRoomData chatRoomData)
-  {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+    if (this.chatRoomInfo.containsKey(chatRoomId))
     {
       log.warn(
           "Ignoring existing chat-room for {}: {}",
@@ -369,60 +177,22 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     else
     {
       log.info(
-          "Adding new chat-room to partition {}: {}",
+          "Adding new chat-room for partition {}: {}",
           partition,
-          chatRoomData);
-
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
-      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
-    }
-  }
-
-  int[] getOwnedShards()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .toArray();
-  }
-
-  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
+          chatRoomId);
 
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
+      this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
+      this.dataChannel.createChatRoom(chatRoomInfo);
     }
-
-    return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
 
   Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+    return Flux.fromIterable(chatRoomInfo.values());
   }
 
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+  Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
+    return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
 }
index 5019ed2..9832519 100644 (file)
@@ -18,45 +18,47 @@ import java.util.*;
 public class KafkaChatHomeService implements ChatHomeService
 {
   private final int numPartitions;
-  private final ChatRoomChannel chatRoomChannel;
+  private final InfoChannel infoChannel;
+  private final DataChannel dataChannel;
 
 
 
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+    int shard = selectShard(id);
+    log.info(
+        "Sending create-command for chat rooom: id={}, name={}, shard={}",
+        id,
+        name,
+        shard);
+    return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
   }
 
   @Override
   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    int shard = selectShard(id);
-    return chatRoomChannel
-        .getChatRoomInfo(shard, id)
-        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
-            id,
-            shard,
-            chatRoomChannel.getOwnedShards())));
+    return infoChannel
+        .getChatRoomInfo(id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   @Override
   public Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return chatRoomChannel.getChatRoomInfo();
+    return infoChannel.getChatRoomInfo();
   }
 
   @Override
   public Mono<ChatRoomData> getChatRoomData(UUID id)
   {
     int shard = selectShard(id);
-    return chatRoomChannel
+    return dataChannel
         .getChatRoomData(shard, id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
-            chatRoomChannel.getOwnedShards())));
+            dataChannel.getOwnedShards())));
   }
 
   int selectShard(UUID chatRoomId)
index df9ee73..8ab50f1 100644 (file)
@@ -15,7 +15,7 @@ import java.util.UUID;
 @Slf4j
 public class KafkaChatMessageService implements ChatMessageService
 {
-  private final ChatRoomChannel chatRoomChannel;
+  private final DataChannel dataChannel;
   private final UUID chatRoomId;
 
   private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
@@ -27,7 +27,7 @@ public class KafkaChatMessageService implements ChatMessageService
     LocalDateTime timestamp,
     String text)
   {
-    return chatRoomChannel
+    return dataChannel
         .sendChatMessage(chatRoomId, key, timestamp, text)
         .doOnSuccess(message -> persistMessage(message));
   }
index da7ee75..6810d06 100644 (file)
@@ -2,6 +2,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -17,12 +19,72 @@ import org.springframework.stereotype.Component;
 @Slf4j
 public class KafkaServicesApplicationRunner implements ApplicationRunner
 {
-  private final ConsumerTaskExecutor chatRoomChannelTaskExecutor;
+  private final String infoTopic;
+  private final ThreadPoolTaskExecutor taskExecutor;
+  private final InfoChannel infoChannel;
+  private final DataChannel dataChannel;
+  private final Consumer<String, AbstractMessageTo> infoChannelConsumer;
+  private final Consumer<String, AbstractMessageTo> dataChannelConsumer;
+  private final WorkAssignor workAssignor;
+
+  CompletableFuture<Void> infoChannelConsumerJob;
+  CompletableFuture<Void> dataChannelConsumerJob;
+>>>>>>> 7fb62d3 (WIP:ALIGN)
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
+<<<<<<< HEAD
     chatRoomChannelTaskExecutor.executeConsumerTask();
+=======
+    List<TopicPartition> partitions = infoChannelConsumer
+        .partitionsFor(infoTopic)
+        .stream()
+        .map(partitionInfo -> new TopicPartition(
+            infoTopic,
+            partitionInfo.partition()))
+        .toList();
+    infoChannelConsumer.assign(partitions);
+    log.info("Starting the consumer for the InfoChannel");
+    infoChannelConsumerJob = taskExecutor
+        .submitCompletable(infoChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the InfoChannel exited abnormally!", e);
+          return null;
+        });
+
+    while (infoChannel.loadInProgress())
+    {
+      log.info("InfoChannel is still loading...");
+      Thread.sleep(1000);
+    }
+
+    workAssignor.assignWork(dataChannelConsumer);
+    log.info("Starting the consumer for the DataChannel");
+    dataChannelConsumerJob = taskExecutor
+        .submitCompletable(dataChannel)
+        .exceptionally(e ->
+        {
+          log.error("The consumer for the DataChannel exited abnormally!", e);
+          return null;
+        });
+  }
+
+  @PreDestroy
+  public void joinChatRoomChannelConsumerJob()
+  {
+    log.info("Signaling the consumer of the CahtRoomChannel to quit its work");
+    infoChannelConsumer.wakeup();
+    log.info("Waiting for the consumer of the ChatRoomChannel to finish its work");
+    dataChannelConsumerJob.join();
+    log.info("Joined the consumer of the ChatRoomChannel");
+  }
+
+
+  interface WorkAssignor
+  {
+    void assignWork(Consumer<?, ?> consumer);
   }
 }
index cda0b94..58a470d 100644 (file)
@@ -3,8 +3,8 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -36,54 +36,84 @@ import java.util.Properties;
 public class KafkaServicesConfiguration
 {
   @Bean
-  ConsumerTaskExecutor chatRoomChannelTaskExecutor(
+  ConsumerTaskExecutor infoChannelTaskExecutor(
       ThreadPoolTaskExecutor taskExecutor,
-      ChatRoomChannel chatRoomChannel,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      InfoChannel infoChannel,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ConsumerTaskExecutor.WorkAssignor workAssignor)
   {
     return new ConsumerTaskExecutor(
         taskExecutor,
-        chatRoomChannel,
-        chatRoomChannelConsumer,
+        infoChannel,
+        dataChannelConsumer,
+        workAssignor);
+  }
+
+  @Bean
+  ConsumerTaskExecutor dataChannelTaskExecutor(
+      ThreadPoolTaskExecutor taskExecutor,
+      DataChannel dataChannel,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
+      ConsumerTaskExecutor.WorkAssignor workAssignor)
+  {
+    return new ConsumerTaskExecutor(
+        taskExecutor,
+        dataChannel,
+        dataChannelConsumer,
         workAssignor);
   }
 
   @Bean
   ConsumerTaskExecutor.WorkAssignor workAssignor(
       ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
+      DataChannel dataChannel)
   {
     return consumer ->
     {
       List<String> topics =
-          List.of(properties.getKafka().getChatRoomChannelTopic());
-      consumer.subscribe(topics, chatRoomChannel);
+          List.of(properties.getKafka().getDataChannelTopic());
+      consumer.subscribe(topics, dataChannel);
     };
   }
 
   @Bean
     ChatHomeService kafkaChatHome(
       ChatBackendProperties properties,
-      ChatRoomChannel chatRoomChannel)
+      InfoChannel infoChannel,
+      DataChannel dataChannel)
   {
     return new KafkaChatHomeService(
         properties.getKafka().getNumPartitions(),
-        chatRoomChannel);
+        infoChannel,
+        dataChannel);
+  }
+
+  @Bean
+  InfoChannel infoChannel(
+      ChatBackendProperties properties,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> infoChannelConsumer,
+      DataChannel dataChannel)
+  {
+    return new InfoChannel(
+        properties.getKafka().getInfoChannelTopic(),
+        producer,
+        infoChannelConsumer,
+        dataChannel);
   }
 
   @Bean
-  ChatRoomChannel chatRoomChannel(
+  DataChannel dataChannel(
       ChatBackendProperties properties,
-      Producer<String, AbstractMessageTo> chatRoomChannelProducer,
-      Consumer<String, AbstractMessageTo> chatRoomChannelConsumer,
+      Producer<String, AbstractMessageTo> producer,
+      Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock)
   {
-    return new ChatRoomChannel(
-        properties.getKafka().getChatRoomChannelTopic(),
-        chatRoomChannelProducer,
-        chatRoomChannelConsumer,
+    return new DataChannel(
+        properties.getKafka().getDataChannelTopic(),
+        producer,
+        dataChannelConsumer,
         zoneId,
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
@@ -91,7 +121,7 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+  Producer<String, AbstractMessageTo>  producer(
       Properties defaultProducerProperties,
       ChatBackendProperties chatBackendProperties,
       StringSerializer stringSerializer,
@@ -101,7 +131,7 @@ public class KafkaServicesConfiguration
     defaultProducerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ProducerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_PRODUCER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_PRODUCER");
     return new KafkaProducer<>(
         properties,
         stringSerializer,
@@ -126,7 +156,28 @@ public class KafkaServicesConfiguration
   }
 
   @Bean
-  Consumer<String, AbstractMessageTo>  chatRoomChannelConsumer(
+  Consumer<String, AbstractMessageTo>  infoChannelConsumer(
+      Properties defaultConsumerProperties,
+      ChatBackendProperties chatBackendProperties,
+      StringDeserializer stringDeserializer,
+      JsonDeserializer<AbstractMessageTo> messageDeserializer)
+  {
+    Map<String, Object> properties = new HashMap<>();
+    defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
+    properties.put(
+        ConsumerConfig.CLIENT_ID_CONFIG,
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_INFO_CHANNEL_CONSUMER");
+    properties.put(
+        ConsumerConfig.GROUP_ID_CONFIG,
+        "info_channel");
+    return new KafkaConsumer<>(
+        properties,
+        stringDeserializer,
+        messageDeserializer);
+  }
+
+  @Bean
+  Consumer<String, AbstractMessageTo>  dataChannelConsumer(
       Properties defaultConsumerProperties,
       ChatBackendProperties chatBackendProperties,
       StringDeserializer stringDeserializer,
@@ -136,10 +187,10 @@ public class KafkaServicesConfiguration
     defaultConsumerProperties.forEach((key, value) -> properties.put(key.toString(), value));
     properties.put(
         ConsumerConfig.CLIENT_ID_CONFIG,
-        chatBackendProperties.getKafka().getClientIdPrefix() + "_CHATROOM_CHANNEL_CONSUMER");
+        chatBackendProperties.getKafka().getClientIdPrefix() + "_DATA_CHANNEL_CONSUMER");
     properties.put(
         ConsumerConfig.GROUP_ID_CONFIG,
-        "chatroom_channel");
+        "data_channel");
     return new KafkaConsumer<>(
         properties,
         stringDeserializer,
@@ -168,7 +219,7 @@ public class KafkaServicesConfiguration
   String typeMappings ()
   {
     return
-        "command_create_chatroom:" +  CommandCreateChatRoomTo.class.getCanonicalName() + "," +
+        "event_chatroom_created:" +  EventChatRoomCreated.class.getCanonicalName() + "," +
         "event_chatmessage_received:" + EventChatMessageReceivedTo.class.getCanonicalName();
   }
 
index 17d3a39..d4f6508 100644 (file)
@@ -1,5 +1,6 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.*;
 
 
index 0cf4232..ae5a501 100644 (file)
@@ -1,2 +1,45 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;public class EventChatRoomCreated {
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import java.util.UUID;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventChatRoomCreated extends AbstractMessageTo
+{
+  private String id;
+  private String name;
+  private Integer shard;
+
+
+  public EventChatRoomCreated()
+  {
+    super(ToType.EVENT_CHATROOM_CREATED);
+  }
+
+
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(UUID.fromString(id), name, shard);
+  }
+
+  public static EventChatRoomCreated of(UUID id, String name, Integer shard)
+  {
+    EventChatRoomCreated event = new EventChatRoomCreated();
+
+    event.setId(id.toString());
+    event.setName(name);
+    event.setShard(shard);
+
+    return event;
+  }
 }
index 8ea9cc2..853ee1c 100644 (file)
@@ -11,20 +11,18 @@ import org.springframework.data.mongodb.core.mapping.Document;
 @Getter(AccessLevel.PACKAGE)
 @Setter(AccessLevel.PACKAGE)
 @EqualsAndHashCode(of = { "id" })
-@ToString(of = { "id", "shard", "name" })
+@ToString(of = { "id", "name" })
 @Document
 public class ChatRoomTo
 {
   @Id
   private String id;
-  private Integer shard;
   private String name;
 
   public static ChatRoomTo from(ChatRoomInfo chatRoomInfo)
   {
     return new ChatRoomTo(
         chatRoomInfo.getId().toString(),
-        chatRoomInfo.getShard(),
         chatRoomInfo.getName());
   }
 }
index c87036c..3eb9096 100644 (file)
@@ -36,18 +36,7 @@ public class MongoDbStorageStrategy implements StorageStrategy
         .map(chatRoomTo ->
         {
           UUID chatRoomId = UUID.fromString(chatRoomTo.getId());
-          int shard = shardingStrategy.selectShard(chatRoomId);
-
-          log.info(
-              "{} - old shard: {}, new shard:  {}",
-              chatRoomId,
-              chatRoomTo.getShard(),
-              shard);
-
-          return new ChatRoomInfo(
-              chatRoomId,
-              chatRoomTo.getName(),
-              shard);
+          return new ChatRoomInfo(chatRoomId, chatRoomTo.getName());
         });
   }
 
index f257d5e..630dc63 100644 (file)
@@ -1,6 +1,6 @@
 package de.juplo.kafka.chat.backend;
 
-import de.juplo.kafka.chat.backend.implementation.kafka.ChatRoomChannel;
+import de.juplo.kafka.chat.backend.implementation.kafka.DataChannel;
 import de.juplo.kafka.chat.backend.implementation.kafka.KafkaServicesApplicationRunner;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -19,7 +19,8 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.INFO_TOPIC;
 
 
 @SpringBootTest(
@@ -29,14 +30,18 @@ import static de.juplo.kafka.chat.backend.KafkaConfigurationIT.TOPIC;
         "chat.backend.kafka.client-id-PREFIX=TEST",
         "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-        "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+        "chat.backend.kafka.info-topic=" + INFO_TOPIC,
+        "chat.backend.kafka.data-topic=" + DATA_TOPIC,
         "chat.backend.kafka.num-partitions=10",
         })
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+    topics = { INFO_TOPIC, DATA_TOPIC },
+    partitions = 10)
 @Slf4j
 class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 {
-  final static String TOPIC = "KAFKA_CONFIGURATION_IT";
+  final static String INFO_TOPIC = "KAFKA_CONFIGURATION_IT_INFO_CHANNEL";
+  final static String DATA_TOPIC = "KAFKA_CONFIGURATION_IT_DATA_CHANNEL";
 
   static CompletableFuture<Void> CONSUMER_JOB;
 
@@ -48,19 +53,19 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
       @Autowired KafkaTemplate<String, String> messageTemplate,
       @Autowired Consumer chatRoomChannelConsumer,
       @Autowired ThreadPoolTaskExecutor taskExecutor,
-      @Autowired ChatRoomChannel chatRoomChannel)
+      @Autowired DataChannel dataChannel)
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
+    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"name\": \"FOO\" }", "command_create_chatroom");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
     send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
-    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(TOPIC, 2));
+    List<TopicPartition> assignedPartitions = List.of(new TopicPartition(INFO_TOPIC, 2));
     chatRoomChannelConsumer.assign(assignedPartitions);
-    chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+    dataChannel.onPartitionsAssigned(assignedPartitions);
     CONSUMER_JOB = taskExecutor
-        .submitCompletable(chatRoomChannel)
+        .submitCompletable(dataChannel)
         .exceptionally(e ->
         {
           log.error("The consumer for the ChatRoomChannel exited abnormally!", e);
@@ -70,7 +75,7 @@ class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
 
   static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(INFO_TOPIC, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(
index 4aa362d..7e54ee3 100644 (file)
@@ -18,12 +18,16 @@ import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
 
 import java.time.Clock;
+import java.time.Duration;
 import java.util.List;
+import java.util.UUID;
 
 import static de.juplo.kafka.chat.backend.domain.ChatHomeServiceWithShardsTest.NUM_SHARDS;
-import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.DATA_TOPIC;
+import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServiceTest.INFO_TOPIC;
 
 
 @SpringBootTest(
@@ -38,14 +42,18 @@ import static de.juplo.kafka.chat.backend.implementation.kafka.KafkaChatHomeServ
     "chat.backend.kafka.client-id-PREFIX=TEST",
     "chat.backend.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
     "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-    "chat.backend.kafka.chatroom-channel-topic=" + TOPIC,
+    "chat.backend.kafka.info-channel-topic=" + INFO_TOPIC,
+    "chat.backend.kafka.data-channel-topic=" + DATA_TOPIC,
     "chat.backend.kafka.num-partitions=" + NUM_SHARDS,
 })
-@EmbeddedKafka(topics = { TOPIC }, partitions = 10)
+@EmbeddedKafka(
+    topics = { INFO_TOPIC, DATA_TOPIC },
+    partitions = 10)
 @Slf4j
 public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 {
-  final static String TOPIC = "KAFKA_CHAT_HOME_TEST";
+  final static String INFO_TOPIC = "KAFKA_CHAT_HOME_TEST_INFO";
+  final static String DATA_TOPIC = "KAFKA_CHAT_HOME_TEST_DATA";
 
 
   @TestConfiguration
@@ -55,14 +63,22 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
   {
     @Bean
     ConsumerTaskExecutor.WorkAssignor workAssignor(
-        ChatRoomChannel chatRoomChannel)
+        InfoChannel infoChannel,
+        DataChannel dataChannel)
     {
       return consumer ->
       {
+        // TODO: Darauf warten, dass der Chat-Room erzeugt wurde!
+        Awaitility
+            .await()
+            .atMost(Duration.ofSeconds(10))
+            .until(() -> infoChannel
+                .getChatRoomInfo(UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"))
+                .block() != null);
         List<TopicPartition> assignedPartitions =
-            List.of(new TopicPartition(TOPIC, 2));
+            List.of(new TopicPartition(DATA_TOPIC, 2));
         consumer.assign(assignedPartitions);
-        chatRoomChannel.onPartitionsAssigned(assignedPartitions);
+        dataChannel.onPartitionsAssigned(assignedPartitions);
       };
     }
 
@@ -79,18 +95,23 @@ public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
       @Autowired ConsumerTaskExecutor consumerTaskExecutor,
       @Autowired KafkaTemplate<String, String> messageTemplate)
   {
-    send(messageTemplate, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "command_create_chatroom");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
-    send(messageTemplate,"5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
+    send(messageTemplate, INFO_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\": \"5c73531c-6fc4-426c-adcb-afc5c140a0f7\", \"shard\": 2, \"name\": \"FOO\" }", "event_chatroom_created");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"peter\", \"text\" : \"Hallo, ich heiße Peter!\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"ute\", \"text\" : \"Ich bin Ute...\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 2, \"user\" : \"peter\", \"text\" : \"Willst du mit mir gehen?\" }", "event_chatmessage_received");
+    send(messageTemplate, DATA_TOPIC, "5c73531c-6fc4-426c-adcb-afc5c140a0f7","{ \"id\" : 1, \"user\" : \"klaus\", \"text\" : \"Ja? Nein? Vielleicht??\" }", "event_chatmessage_received");
 
     consumerTaskExecutor.executeConsumerTask();
   }
 
-  static void send(KafkaTemplate<String, String> kafkaTemplate, String key, String value, String typeId)
+  static void send(
+      KafkaTemplate<String, String> kafkaTemplate,
+      String topic,
+      String key,
+      String value,
+      String typeId)
   {
-    ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, key, value);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
     record.headers().add("__TypeId__", typeId.getBytes());
     SendResult<String, String> result = kafkaTemplate.send(record).join();
     log.info(
index d9d5a08..72ade06 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.chat.backend.implementation.kafka.messages;
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.data;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;