NEU
authorKai Moritz <kai@juplo.de>
Fri, 14 Apr 2023 09:46:00 +0000 (11:46 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Apr 2023 08:13:30 +0000 (10:13 +0200)
README.txt [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java [deleted file]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java [deleted file]

diff --git a/README.txt b/README.txt
new file mode 100644 (file)
index 0000000..a7c7f84
--- /dev/null
@@ -0,0 +1,46 @@
+Aktuelle Idee für die Kafka-Anbindung
+=====================================
+
+- *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst
+  durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen
+  `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können.
+  - D.h., das Locking, das während einem Rebalance nötig ist, kann
+    *vollständig* in `KafkaChatHomeService` umgesetzt werden.
+  - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen,
+    ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz
+    fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht
+    zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)`
+    abgelehnt werden!
+  - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die
+    Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen
+    für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking
+    in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich*
+    für den `ChatRoom` zuständig ist.
+  - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`)
+    erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale
+    Kontrolle auf doppelte Nachrichten logisch gültig ist.
+- *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt.
+  - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen
+    durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen
+    `ChatRoom`-Instanzen *unterbunden* werden können.
+- *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle
+  Zugriffe eine `ShardNotOwnedException` erzeugt.
+  - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen,
+    die *während* einem Rebalance auflaufen
+- *Umsetzungs-Details:*
+  - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden
+    in einem zeitkritischem Setting laufen, muss das eigentliche Laden der
+    `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an
+    den `KafkaConsumer` zurückgegeben werden kann.
+  - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus
+    wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch
+    den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt
+    wurden.
+  - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser
+    gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn
+    die Instanz in den normalen Modus zurückwechselt.
+  - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_
+    Anfragen mit `ShardNotOwnedException` abgelehnt werden!
+  - Eine besondere Herausforderung sind *erneute* Rebalances, die
+    Auftreten, *während* der `KafkaChatRoomService` sich noch in einem
+    durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet!
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java
deleted file mode 100644 (file)
index 21a6e1a..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChatHomeConsumer extends Runnable
-{
-  private final Consumer<String, MessageTo> consumer;
-
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java
deleted file mode 100644 (file)
index 465775f..0000000
+++ /dev/null
@@ -1,73 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-class ChatHomeLoader
-{
-  private final Producer<String, MessageTo> producer;
-  private final long offsetOfFirstUnseenMessage;
-  private final ZoneId zoneId;
-  @Getter
-  private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
-
-
-  /**
-   * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
-   * all messages, that belong to the partition, that defines the shard this
-   * service represents.
-   * The loader signals, that its work is done, if the given end offset is reached.
-   * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
-   * @return {@code true}, if all messages are consumed.
-   */
-  boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
-  {
-    TopicPartition topicPartition =
-        new TopicPartition(record.topic(), record.partition());
-    Message.MessageKey messageKey = Message.MessageKey.of(
-        record.value().getUser(),
-        record.value().getId());
-
-    if (record.offset() >= offsetOfFirstUnseenMessage)
-    {
-      // All messages consumed: DONE!
-      log.trace(
-          "Ignoring unseen message {} on {}, offset={}",
-          messageKey,
-          topicPartition,
-          record.offset());
-      return true;
-    }
-
-    Instant timestamp = Instant.ofEpochMilli(record.timestamp());
-    LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
-
-    KafkaChatRoomService service = kafkaChatRoomServiceMap
-        .computeIfAbsent(record.key(), key ->
-            new KafkaChatRoomService(producer, topicPartition));
-
-    service.addMessage(new Message(
-        messageKey,
-        record.offset(),
-        time,
-        record.value().getText()));
-
-    return false;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java
deleted file mode 100644 (file)
index 7149603..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-{
-  private final Consumer consumer;
-  private final TopicPartition tp;
-  private final long currentOffset;
-  private final long unseenOffset;
-
-  /**
-   * Wird nicht benötigt!!!
-   * Derzeit?
-   * @param consumer
-   * @param tp
-   * @param currentOffset
-   * @param unseenOffset
-   */
-  ChatRoomLoadingMessageHandlingStrategy(
-      Consumer consumer,
-      TopicPartition tp,
-      long currentOffset,
-      long unseenOffset)
-  {
-    this.consumer = consumer;
-    this.tp = tp;
-    this.currentOffset = currentOffset;
-    this.unseenOffset = unseenOffset;
-
-    consumer.seek(tp, unseenOffset);
-  }
-
-  @Override
-  public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
-  {
-    // TODO
-    return null;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java
deleted file mode 100644 (file)
index 0da4111..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-@RequiredArgsConstructor
-class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
-{
-  private final TopicPartition topicPartition;
-
-  @Override
-  public Mono<Message> handleMessage(
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text)
-  {
-    throw new ShardNotOwnedException(topicPartition);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java
deleted file mode 100644 (file)
index 1fb4c47..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-interface MessageHandlingStrategy
-{
-  Mono<Message> handleMessage(
-      Message.MessageKey key,
-      LocalDateTime timestamp,
-      String text);
-}