feat: First incomplete but compilable stab at an implementation with Kafka
authorKai Moritz <kai@juplo.de>
Sun, 22 Jan 2023 17:13:40 +0000 (18:13 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 18 Aug 2023 15:18:41 +0000 (17:18 +0200)
* This approach uses `MessageHandlingStrategy`s to distinguishe between
  three states of the `ChatRoom`: inactive, loading and active.
* But in the course of the implementation, it has shown, that only
  one strategy (`ChatRoomActiveMessageHandlingStrategy`) is needed.
* Hence, this commit represents a turning point in the intended
  implementation.

12 files changed:
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java [new file with mode: 0644]

index 4db77ee..339451a 100644 (file)
@@ -70,7 +70,7 @@ public class ChatBackendController
             .flatMap(chatroom -> put(chatroom, username, messageId, text));
   }
 
-  public Mono<MessageTo> put(
+  private Mono<MessageTo> put(
       ChatRoom chatroom,
       String username,
       Long messageId,
index 1f70f11..714c220 100644 (file)
@@ -2,17 +2,43 @@ package de.juplo.kafka.chat.backend.domain;
 
 import lombok.Getter;
 
+import java.util.Arrays;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 
-public class UnknownChatroomException extends RuntimeException
+public class UnknownChatroomException extends IllegalStateException
 {
   @Getter
   private final UUID chatroomId;
+  @Getter
+  private final Optional<Integer> shard;
+  @Getter
+  private final Optional<int[]> ownedShards;
 
   public UnknownChatroomException(UUID chatroomId)
   {
     super("Chatroom does not exist: " + chatroomId);
     this.chatroomId = chatroomId;
+    this.shard = Optional.empty();
+    this.ownedShards = Optional.empty();
+  }
+
+  public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards)
+  {
+    super(
+        "Chatroom does not exist (here): " +
+        chatroomId +
+        " shard=" +
+        shard +
+        ", owned=" +
+        Arrays
+            .stream(ownedShards)
+            .mapToObj(ownedShard -> Integer.toString(ownedShard))
+            .collect(Collectors.joining(",")));
+    this.chatroomId = chatroomId;
+    this.shard = Optional.of(shard);
+    this.ownedShards = Optional.of(ownedShards);
   }
 }
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
new file mode 100644 (file)
index 0000000..465775f
--- /dev/null
@@ -0,0 +1,73 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class ChatHomeLoader
+{
+  private final Producer<String, MessageTo> producer;
+  private final long offsetOfFirstUnseenMessage;
+  private final ZoneId zoneId;
+  @Getter
+  private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
+
+
+  /**
+   * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
+   * all messages, that belong to the partition, that defines the shard this
+   * service represents.
+   * The loader signals, that its work is done, if the given end offset is reached.
+   * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
+   * @return {@code true}, if all messages are consumed.
+   */
+  boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
+  {
+    TopicPartition topicPartition =
+        new TopicPartition(record.topic(), record.partition());
+    Message.MessageKey messageKey = Message.MessageKey.of(
+        record.value().getUser(),
+        record.value().getId());
+
+    if (record.offset() >= offsetOfFirstUnseenMessage)
+    {
+      // All messages consumed: DONE!
+      log.trace(
+          "Ignoring unseen message {} on {}, offset={}",
+          messageKey,
+          topicPartition,
+          record.offset());
+      return true;
+    }
+
+    Instant timestamp = Instant.ofEpochMilli(record.timestamp());
+    LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
+
+    KafkaChatRoomService service = kafkaChatRoomServiceMap
+        .computeIfAbsent(record.key(), key ->
+            new KafkaChatRoomService(producer, topicPartition));
+
+    service.addMessage(new Message(
+        messageKey,
+        record.offset(),
+        time,
+        record.value().getText()));
+
+    return false;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..562e2df
--- /dev/null
@@ -0,0 +1,69 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.UUID;
+
+
+/**
+ * TODO:
+ * Actual the only active strategy!
+ * {@link MessageHandlingStrategy} probably not needed: Refactor!
+ */
+@RequiredArgsConstructor
+@Slf4j
+class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
+{
+  private final KafkaChatRoomService kafkaChatRoomService;
+  private final Producer<String, MessageTo> producer;
+  private final TopicPartition tp;
+  private final UUID chatRoomId;
+  private final ZoneOffset zoneOffset;
+
+
+  @Override
+  public Mono<Message> handleMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, MessageTo> record =
+          new ProducerRecord<>(
+              tp.topic(),
+              tp.partition(),
+              timestamp.toEpochSecond(zoneOffset),
+              chatRoomId.toString(),
+              MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          // On successful send
+          {
+            // Emit new message
+            Message message = new Message(key, metadata.offset(), timestamp, text);
+            kafkaChatRoomService.addMessage(message);
+          }
+
+          sink.success();
+        }
+        else
+        {
+          // On send-failure
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..c7a3c8b
--- /dev/null
@@ -0,0 +1,44 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+/**
+ * TODO:
+ * Not used anywhere
+ * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment.
+ * Refactor?
+ */
+class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
+{
+  private final Consumer consumer;
+  private final TopicPartition tp;
+  private final long currentOffset;
+  private final long unseenOffset;
+
+  ChatRoomLoadingMessageHandlingStrategy(
+      Consumer consumer,
+      TopicPartition tp,
+      long currentOffset,
+      long unseenOffset)
+  {
+    this.consumer = consumer;
+    this.tp = tp;
+    this.currentOffset = currentOffset;
+    this.unseenOffset = unseenOffset;
+
+    consumer.seek(tp, unseenOffset);
+  }
+
+  @Override
+  public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
+  {
+    // TODO
+    return null;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..be0136a
--- /dev/null
@@ -0,0 +1,25 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+@RequiredArgsConstructor
+class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
+{
+  private final int shard;
+
+  @Override
+  public Mono<Message> handleMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text)
+  {
+    throw new ShardNotOwnedException(shard);
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
new file mode 100644 (file)
index 0000000..eadd762
--- /dev/null
@@ -0,0 +1,115 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.ZoneId;
+import java.util.*;
+
+
+@Slf4j
+public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
+{
+  private final Consumer<String, MessageTo> consumer;
+  private final Producer<String, MessageTo> producer;
+  private final String topic;
+  private final ZoneId zoneId;
+  // private final long[] offsets; Erst mal immer alles neu einlesen
+  private final ChatHomeLoader[] chatHomeLoaders;
+  private final Map<UUID, ChatRoom>[] chatRoomMaps;
+
+
+  public KafkaChatHomeService(
+    Consumer<String, MessageTo> consumer,
+    Producer<String, MessageTo> producer,
+    String topic,
+    ZoneId zoneId,
+    int numShards)
+  {
+    log.debug("Creating KafkaChatHomeService");
+    this.consumer = consumer;
+    this.producer = producer;
+    this.topic = topic;
+    this.zoneId = zoneId;
+    // this.offsets = new long[numShards];
+    // for (int i=0; i< numShards; i++)
+    // {
+    //   this.offsets[i] = 0l;
+    // }
+    this.chatHomeLoaders = new ChatHomeLoader[numShards];
+    this.chatRoomMaps = new Map[numShards];
+  }
+
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+    {
+      if (!topicPartition.topic().equals(topic))
+      {
+        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
+        return;
+      }
+
+      int partition = topicPartition.partition();
+      long unseenOffset = 0; // offsets[partition];
+
+      log.info(
+          "Loading messages from partition {}: start-offset={} -> current-offset={}",
+          partition,
+          unseenOffset,
+          currentOffset);
+
+      // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
+      consumer.seek(topicPartition, unseenOffset);
+      chatHomeLoaders[partition] = new ChatHomeLoader(
+          producer,
+          currentOffset,
+          zoneId);
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(topicPartition ->
+    {
+      if (!topicPartition.topic().equals(topic))
+      {
+        log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
+        return;
+      }
+
+      int partition = topicPartition.partition();
+      // long unseenOffset = offsets[partition]; TODO: Offset merken...?
+    });
+    log.info("Revoked partitions: {}", partitions);
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    // TODO: Muss auf den Verlust anders reagiert werden?
+    onPartitionsRevoked(partitions);
+  }
+
+  @Override
+  public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+  {
+    return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+  }
+
+  @Override
+  public Flux<ChatRoom> getChatRooms(int shard)
+  {
+    return Flux.fromStream(chatRoomMaps[shard].values().stream());
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java
new file mode 100644 (file)
index 0000000..20d85e8
--- /dev/null
@@ -0,0 +1,17 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+public class KafkaChatRoomFactory implements ChatRoomFactory
+{
+  @Override
+  public Mono<ChatRoom> createChatRoom(UUID id, String name)
+  {
+    return null;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java
new file mode 100644 (file)
index 0000000..794c5c5
--- /dev/null
@@ -0,0 +1,101 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+public class KafkaChatRoomService implements ChatRoomService
+{
+  private final Producer<String, MessageTo> producer;
+  private final TopicPartition tp;
+
+  private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+  private volatile MessageHandlingStrategy strategy;
+
+
+  public KafkaChatRoomService(
+      Producer<String, MessageTo> producer,
+      TopicPartition tp)
+  {
+    this.producer = producer;
+    this.tp = tp;
+    this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp.partition());
+  }
+
+
+  @Override
+  synchronized public Mono<Message> persistMessage(
+    Message.MessageKey key,
+    LocalDateTime timestamp,
+    String text)
+  {
+    return strategy.handleMessage(key, timestamp, text);
+  }
+
+  /**
+   * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
+   * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
+   */
+  protected void addMessage(Message message) throws MessageMutationException
+  {
+    Message existingMessage = messages.get(message.getKey());
+
+    // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
+    // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
+    // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
+    // fängt dies bereits der ChatRoom ab.
+    // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
+    // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
+    // doppelt aufschlägt...
+    if (existingMessage == null)
+    {
+      messages.put(message.getKey(), message);
+    }
+    else
+    {
+      if (!existingMessage.getMessageText().equals(message.getMessageText()))
+      {
+        throw new MessageMutationException(existingMessage, message.getMessageText());
+      }
+
+      // Warn and emit existing message
+      log.warn(
+          "Keeping existing message with {}@{} for {}",
+          existingMessage.getSerialNumber(),
+          existingMessage.getTimestamp(),
+          existingMessage.getKey());
+    }
+  }
+
+  @Override
+  synchronized public Mono<Message> getMessage(Message.MessageKey key)
+  {
+    // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
+    // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
+    return Mono.fromSupplier(() -> messages.get(key));
+  }
+
+  @Override
+  synchronized public Flux<Message> getMessages(long first, long last)
+  {
+    return Flux.fromStream(messages
+      .values()
+      .stream()
+      .filter(message ->
+      {
+        long serial = message.getSerialNumber();
+        return serial >= first && serial <= last;
+      }));
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java
new file mode 100644 (file)
index 0000000..1fb4c47
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+interface MessageHandlingStrategy
+{
+  Mono<Message> handleMessage(
+      Message.MessageKey key,
+      LocalDateTime timestamp,
+      String text);
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java
new file mode 100644 (file)
index 0000000..0a867f1
--- /dev/null
@@ -0,0 +1,33 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class MessageTo
+{
+  private String user;
+  private Long id;
+  private String text;
+
+  public Message toMessage(long offset, LocalDateTime timestamp)
+  {
+    return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+  }
+
+  public static MessageTo from(Message message)
+  {
+    return
+        new MessageTo(
+            message.getUsername(),
+            message.getId(),
+            message.getMessageText());
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java
new file mode 100644 (file)
index 0000000..0c4884b
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class MessageToTest
+{
+  final String json = """
+  {
+    "id": 1,
+    "text": "Hallo, ich heiße Peter!",
+    "user": "Peter"
+  }""";
+
+  ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp()
+  {
+    mapper = new ObjectMapper();
+    mapper.registerModule(new JavaTimeModule());
+    mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+  }
+
+  @Test
+  public void testDeserialization() throws Exception
+  {
+    MessageTo message = mapper.readValue(json, MessageTo.class);
+    assertThat(message.getId()).isEqualTo(1l);
+    assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+    assertThat(message.getUser()).isEqualTo("Peter");
+  }
+}