--- /dev/null
+Aktuelle Idee für die Kafka-Anbindung
+=====================================
+
+- *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst
+ durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen
+ `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können.
+ - D.h., das Locking, das während einem Rebalance nötig ist, kann
+ *vollständig* in `KafkaChatHomeService` umgesetzt werden.
+ - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen,
+ ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz
+ fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht
+ zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)`
+ abgelehnt werden!
+ - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die
+ Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen
+ für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking
+ in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich*
+ für den `ChatRoom` zuständig ist.
+ - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`)
+ erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale
+ Kontrolle auf doppelte Nachrichten logisch gültig ist.
+- *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt.
+ - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen
+ durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen
+ `ChatRoom`-Instanzen *unterbunden* werden können.
+- *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle
+ Zugriffe eine `ShardNotOwnedException` erzeugt.
+ - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen,
+ die *während* einem Rebalance auflaufen
+- *Umsetzungs-Details:*
+ - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden
+ in einem zeitkritischem Setting laufen, muss das eigentliche Laden der
+ `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an
+ den `KafkaConsumer` zurückgegeben werden kann.
+ - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus
+ wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch
+ den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt
+ wurden.
+ - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser
+ gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn
+ die Instanz in den normalen Modus zurückwechselt.
+ - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_
+ Anfragen mit `ShardNotOwnedException` abgelehnt werden!
+ - Eine besondere Herausforderung sind *erneute* Rebalances, die
+ Auftreten, *während* der `KafkaChatRoomService` sich noch in einem
+ durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet!
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ChatHomeConsumer extends Runnable
-{
- private final Consumer<String, MessageTo> consumer;
-
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import de.juplo.kafka.chat.backend.domain.Message;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-
-@RequiredArgsConstructor
-@Slf4j
-class ChatHomeLoader
-{
- private final Producer<String, MessageTo> producer;
- private final long offsetOfFirstUnseenMessage;
- private final ZoneId zoneId;
- @Getter
- private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
-
-
- /**
- * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
- * all messages, that belong to the partition, that defines the shard this
- * service represents.
- * The loader signals, that its work is done, if the given end offset is reached.
- * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
- * @return {@code true}, if all messages are consumed.
- */
- boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
- {
- TopicPartition topicPartition =
- new TopicPartition(record.topic(), record.partition());
- Message.MessageKey messageKey = Message.MessageKey.of(
- record.value().getUser(),
- record.value().getId());
-
- if (record.offset() >= offsetOfFirstUnseenMessage)
- {
- // All messages consumed: DONE!
- log.trace(
- "Ignoring unseen message {} on {}, offset={}",
- messageKey,
- topicPartition,
- record.offset());
- return true;
- }
-
- Instant timestamp = Instant.ofEpochMilli(record.timestamp());
- LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
-
- KafkaChatRoomService service = kafkaChatRoomServiceMap
- .computeIfAbsent(record.key(), key ->
- new KafkaChatRoomService(producer, topicPartition));
-
- service.addMessage(new Message(
- messageKey,
- record.offset(),
- time,
- record.value().getText()));
-
- return false;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
-{
- private final Consumer consumer;
- private final TopicPartition tp;
- private final long currentOffset;
- private final long unseenOffset;
-
- /**
- * Wird nicht benötigt!!!
- * Derzeit?
- * @param consumer
- * @param tp
- * @param currentOffset
- * @param unseenOffset
- */
- ChatRoomLoadingMessageHandlingStrategy(
- Consumer consumer,
- TopicPartition tp,
- long currentOffset,
- long unseenOffset)
- {
- this.consumer = consumer;
- this.tp = tp;
- this.currentOffset = currentOffset;
- this.unseenOffset = unseenOffset;
-
- consumer.seek(tp, unseenOffset);
- }
-
- @Override
- public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
- {
- // TODO
- return null;
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-@RequiredArgsConstructor
-class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
-{
- private final TopicPartition topicPartition;
-
- @Override
- public Mono<Message> handleMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text)
- {
- throw new ShardNotOwnedException(topicPartition);
- }
-}
+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.kafka;
-
-import de.juplo.kafka.chat.backend.domain.Message;
-import reactor.core.publisher.Mono;
-
-import java.time.LocalDateTime;
-
-
-interface MessageHandlingStrategy
-{
- Mono<Message> handleMessage(
- Message.MessageKey key,
- LocalDateTime timestamp,
- String text);
-}