WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:28:02 +0000 (23:28 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 12 Sep 2023 21:35:20 +0000 (23:35 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventChatRoomCreated.java

index 8598464..a7899e4 100644 (file)
@@ -4,7 +4,6 @@ import de.juplo.kafka.chat.backend.domain.*;
 import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
 import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
index 97739b5..22c4668 100644 (file)
@@ -1,13 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
-import de.juplo.kafka.chat.backend.domain.ChatRoomData;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.exceptions.LoadInProgressException;
-import de.juplo.kafka.chat.backend.domain.exceptions.ShardNotOwnedException;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
-import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -15,7 +9,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -33,6 +26,9 @@ public class InfoChannel implements Runnable
   private final String topic;
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
+  private final int numShards;
+  private final long[] startOffset;
+  private final long[] currentOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
 
   private boolean running;
@@ -50,28 +46,50 @@ public class InfoChannel implements Runnable
     this.consumer = infoChannelConsumer;
     this.producer = producer;
     this.chatRoomInfo = new HashMap<>();
+
+    this.numShards = consumer
+        .partitionsFor(topic)
+        .size();
+    this.startOffset = new long[numShards];
+    this.currentOffset = new long[numShards];
+    IntStream
+        .range(0, numShards)
+        .forEach(partition -> this.currentOffset[partition] = -1l);
+    consumer
+        .endOffsets(consumer.assignment())
+        .entrySet()
+        .stream()
+        .forEach(entry -> this.startOffset[entry.getKey().partition()] = entry.getValue());
   }
 
 
+  Mono<Boolean> loadInProgress()
+  {
+    return Mono
+        .fromSupplier(() -> IntStream
+            .range(0, numShards)
+            .anyMatch(partition -> currentOffset[partition] < startOffset[partition]));
+  }
 
   Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
       UUID chatRoomId,
-      String name)
+      String name,
+      int shard)
   {
-    EventChatRoomCreated to = EventChatRoomCreated.of(name);
+    EventChatRoomCreated to = EventChatRoomCreated.of(chatRoomId, name, shard);
     return Mono.create(sink ->
     {
       ProducerRecord<String, AbstractMessageTo> record =
           new ProducerRecord<>(
-              topic<,
-              chatRoomId.toString(),
+              topic,
+              Integer.toString(shard),
               to);
 
       producer.send(record, ((metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.info("Successfully send chreate-request for chat room: {}", to);
+          log.info("Successfully sent chreate-request for chat room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, record.partition());
           createChatRoom(chatRoomInfo);
           sink.success(chatRoomInfo);
@@ -100,28 +118,9 @@ public class InfoChannel implements Runnable
     {
       try
       {
-        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(5));
-        log.info("Fetched {} messages", records.count());
-
-        if (loadInProgress)
-        {
-          loadChatRoom(records);
-
-          if (isLoadingCompleted())
-          {
-            log.info("Loading of messages completed! Pausing all owned partitions...");
-            pauseAllOwnedPartions();
-            log.info("Resuming normal operations...");
-            loadInProgress = false;
-          }
-        }
-        else
-        {
-          if (!records.isEmpty())
-          {
-            throw new IllegalStateException("All owned partitions should be paused, when no load is in progress!");
-          }
-        }
+        ConsumerRecords<String, AbstractMessageTo> records = consumer.poll(Duration.ofMinutes(1));
+        log.debug("Fetched {} messages", records.count());
+        handleMessages(records);
       }
       catch (WakeupException e)
       {
@@ -133,122 +132,36 @@ public class InfoChannel implements Runnable
     log.info("Exiting normally");
   }
 
-  private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
+  private void handleMessages(ConsumerRecords<String, AbstractMessageTo> records)
   {
     for (ConsumerRecord<String, AbstractMessageTo> record : records)
     {
-      UUID chatRoomId = UUID.fromString(record.key());
-
       switch (record.value().getType())
       {
         case EVENT_CHATROOM_CREATED:
-          createChatRoom(
-              chatRoomId,
-              (CommandCreateChatRoomTo) record.value(),
-              record.partition());
-          break;
-
-        case EVENT_CHATMESSAGE_RECEIVED:
-          Instant instant = Instant.ofEpochSecond(record.timestamp());
-          LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-          loadChatMessage(
-              chatRoomId,
-              timestamp,
-              record.offset(),
-              (EventChatMessageReceivedTo) record.value(),
-              record.partition());
+          EventChatRoomCreated eventChatRoomCreated =
+              (EventChatRoomCreated) record.value();
+          createChatRoom(eventChatRoomCreated.toChatRoomInfo());
           break;
 
         default:
           log.debug(
-              "Ignoring message for chat-room {} with offset {}: {}",
-              chatRoomId,
+              "Ignoring message for key={} with offset={}: {}",
+              record.key(),
               record.offset(),
               record.value());
       }
 
-      nextOffset[record.partition()] = record.offset() + 1;
+      startOffset[record.partition()] = record.offset() + 1;
     }
   }
 
-  private void createChatRoom(
-      UUID chatRoomId,
-      CommandCreateChatRoomTo createChatRoomRequestTo,
-      Integer partition)
-  {
-    log.info(
-        "Loading ChatRoom {} for shard {} with buffer-size {}",
-        chatRoomId,
-        partition,
-        bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
-    ChatRoomData chatRoomData = new ChatRoomData(
-        clock,
-        service,
-        bufferSize);
-    putChatRoom(
-        chatRoomId,
-        createChatRoomRequestTo.getName(),
-        partition,
-        chatRoomData);
-  }
-
-
   private void createChatRoom(ChatRoomInfo chatRoomInfo)
   {
-    UUID id = chatRoomInfo.getId();
-    log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
-    KafkaChatMessageService service = new KafkaChatMessageService(this, id);
-    ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize);
-    putChatRoom(
-        chatRoomInfo.getId(),
-        chatRoomInfo.getName(),
-        chatRoomInfo.getShard(),
-        chatRoomData);
-  }
-
-  private void loadChatMessage(
-      UUID chatRoomId,
-      LocalDateTime timestamp,
-      long offset,
-      EventChatMessageReceivedTo chatMessageTo,
-      int partition)
-  {
-    Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
-    Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+    UUID chatRoomId = chatRoomInfo.getId();
+    Integer partition = chatRoomInfo.getShard();
 
-    ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId);
-    KafkaChatMessageService kafkaChatRoomService =
-        (KafkaChatMessageService) chatRoomData.getChatRoomService();
-
-    kafkaChatRoomService.persistMessage(message);
-  }
-
-  private boolean isLoadingCompleted()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]);
-  }
-
-  private void pauseAllOwnedPartions()
-  {
-    consumer.pause(IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .mapToObj(shard -> new TopicPartition(topic, shard))
-        .toList());
-  }
-
-
-  private void putChatRoom(
-      UUID chatRoomId,
-      String name,
-      Integer partition,
-      ChatRoomData chatRoomData)
-  {
-    if (this.chatRoomInfo[partition].containsKey(chatRoomId))
+    if (this.chatRoomInfo.containsKey(chatRoomId))
     {
       log.warn(
           "Ignoring existing chat-room for {}: {}",
@@ -258,60 +171,21 @@ public class InfoChannel implements Runnable
     else
     {
       log.info(
-          "Adding new chat-room to partition {}: {}",
+          "Adding new chat-room for partition {}: {}",
           partition,
-          chatRoomData);
-
-      this.chatRoomInfo[partition].put(
-          chatRoomId,
-          new ChatRoomInfo(chatRoomId, name, partition));
-      this.chatRoomData[partition].put(chatRoomId, chatRoomData);
-    }
-  }
-
-  int[] getOwnedShards()
-  {
-    return IntStream
-        .range(0, numShards)
-        .filter(shard -> isShardOwned[shard])
-        .toArray();
-  }
-
-  Mono<ChatRoomData> getChatRoomData(int shard, UUID id)
-  {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
+          chatRoomId);
 
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
+      this.chatRoomInfo.put(chatRoomId, chatRoomInfo);
     }
-
-    return Mono.justOrEmpty(chatRoomData[shard].get(id));
   }
 
   Flux<ChatRoomInfo> getChatRoomInfo()
   {
-    return Flux
-        .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
-        .filter(shard -> isShardOwned[shard])
-        .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values()));
+    return Flux.fromIterable(chatRoomInfo.values());
   }
 
-  Mono<ChatRoomInfo> getChatRoomInfo(int shard, UUID id)
+  Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    if (loadInProgress)
-    {
-      return Mono.error(new LoadInProgressException());
-    }
-
-    if (!isShardOwned[shard])
-    {
-      return Mono.error(new ShardNotOwnedException(shard));
-    }
-
-    return Mono.justOrEmpty(chatRoomInfo[shard].get(id));
+    return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
 }
index 707e843..d0b8eff 100644 (file)
@@ -26,16 +26,20 @@ public class KafkaChatHomeService implements ChatHomeService
   @Override
   public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
   {
-    log.info("Sending create-command for chat rooom: id={}, name={}");
-    return infoChannel.sendChatRoomCreatedEvent(id, name);
+    int shard = selectShard(id);
+    log.info(
+        "Sending create-command for chat rooom: id={}, name={}, shard={}",
+        id,
+        name,
+        shard);
+    return infoChannel.sendChatRoomCreatedEvent(id, name, shard);
   }
 
   @Override
   public Mono<ChatRoomInfo> getChatRoomInfo(UUID id)
   {
-    int shard = selectShard(id);
     return infoChannel
-        .getChatRoomInfo(shard, id)
+        .getChatRoomInfo(id)
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(
             id,
             shard,
index b5cf458..ae5a501 100644 (file)
@@ -1,11 +1,14 @@
 package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
 
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+import java.util.UUID;
+
 
 @Getter
 @Setter
@@ -24,5 +27,19 @@ public class EventChatRoomCreated extends AbstractMessageTo
   }
 
 
-  public static EventChatRoomCreated of(String )
+  public ChatRoomInfo toChatRoomInfo()
+  {
+    return new ChatRoomInfo(UUID.fromString(id), name, shard);
+  }
+
+  public static EventChatRoomCreated of(UUID id, String name, Integer shard)
+  {
+    EventChatRoomCreated event = new EventChatRoomCreated();
+
+    event.setId(id.toString());
+    event.setName(name);
+    event.setShard(shard);
+
+    return event;
+  }
 }