From 8d2d6ec211a5254aed17b9393948108a7a12b4b6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Apr 2023 11:46:00 +0200 Subject: [PATCH] NEU --- README.txt | 46 ++++++++++++ .../persistence/kafka/ChatHomeConsumer.java | 14 ---- .../persistence/kafka/ChatHomeLoader.java | 73 ------------------- ...hatRoomLoadingMessageHandlingStrategy.java | 46 ------------ ...atroomInactiveMessageHandlingStrategy.java | 25 ------- .../kafka/MessageHandlingStrategy.java | 15 ---- 6 files changed, 46 insertions(+), 173 deletions(-) create mode 100644 README.txt delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java diff --git a/README.txt b/README.txt new file mode 100644 index 00000000..a7c7f849 --- /dev/null +++ b/README.txt @@ -0,0 +1,46 @@ +Aktuelle Idee für die Kafka-Anbindung +===================================== + +- *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst + durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen + `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können. + - D.h., das Locking, das während einem Rebalance nötig ist, kann + *vollständig* in `KafkaChatHomeService` umgesetzt werden. + - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen, + ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz + fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht + zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)` + abgelehnt werden! + - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die + Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen + für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking + in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich* + für den `ChatRoom` zuständig ist. + - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`) + erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale + Kontrolle auf doppelte Nachrichten logisch gültig ist. +- *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt. + - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen + durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen + `ChatRoom`-Instanzen *unterbunden* werden können. +- *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle + Zugriffe eine `ShardNotOwnedException` erzeugt. + - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen, + die *während* einem Rebalance auflaufen +- *Umsetzungs-Details:* + - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden + in einem zeitkritischem Setting laufen, muss das eigentliche Laden der + `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an + den `KafkaConsumer` zurückgegeben werden kann. + - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus + wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch + den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt + wurden. + - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser + gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn + die Instanz in den normalen Modus zurückwechselt. + - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_ + Anfragen mit `ShardNotOwnedException` abgelehnt werden! + - Eine besondere Herausforderung sind *erneute* Rebalances, die + Auftreten, *während* der `KafkaChatRoomService` sich noch in einem + durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet! diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java deleted file mode 100644 index 21a6e1a7..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; - - -@RequiredArgsConstructor -@Slf4j -public class ChatHomeConsumer extends Runnable -{ - private final Consumer consumer; - -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java deleted file mode 100644 index 465775f2..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java +++ /dev/null @@ -1,73 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.Message; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.common.TopicPartition; - -import java.time.Instant; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -class ChatHomeLoader -{ - private final Producer producer; - private final long offsetOfFirstUnseenMessage; - private final ZoneId zoneId; - @Getter - private final Map kafkaChatRoomServiceMap = new HashMap<>(); - - - /** - * Rebuilds the state of the {@link KafkaChatHomeService} by consuming - * all messages, that belong to the partition, that defines the shard this - * service represents. - * The loader signals, that its work is done, if the given end offset is reached. - * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition. - * @return {@code true}, if all messages are consumed. - */ - boolean handleMessage(ConsumerRecord record) - { - TopicPartition topicPartition = - new TopicPartition(record.topic(), record.partition()); - Message.MessageKey messageKey = Message.MessageKey.of( - record.value().getUser(), - record.value().getId()); - - if (record.offset() >= offsetOfFirstUnseenMessage) - { - // All messages consumed: DONE! - log.trace( - "Ignoring unseen message {} on {}, offset={}", - messageKey, - topicPartition, - record.offset()); - return true; - } - - Instant timestamp = Instant.ofEpochMilli(record.timestamp()); - LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId); - - KafkaChatRoomService service = kafkaChatRoomServiceMap - .computeIfAbsent(record.key(), key -> - new KafkaChatRoomService(producer, topicPartition)); - - service.addMessage(new Message( - messageKey, - record.offset(), - time, - record.value().getText())); - - return false; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java deleted file mode 100644 index 7149603a..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java +++ /dev/null @@ -1,46 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy -{ - private final Consumer consumer; - private final TopicPartition tp; - private final long currentOffset; - private final long unseenOffset; - - /** - * Wird nicht benötigt!!! - * Derzeit? - * @param consumer - * @param tp - * @param currentOffset - * @param unseenOffset - */ - ChatRoomLoadingMessageHandlingStrategy( - Consumer consumer, - TopicPartition tp, - long currentOffset, - long unseenOffset) - { - this.consumer = consumer; - this.tp = tp; - this.currentOffset = currentOffset; - this.unseenOffset = unseenOffset; - - consumer.seek(tp, unseenOffset); - } - - @Override - public Mono handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text) - { - // TODO - return null; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java deleted file mode 100644 index 0da41117..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java +++ /dev/null @@ -1,25 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; -import lombok.RequiredArgsConstructor; -import org.apache.kafka.common.TopicPartition; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -@RequiredArgsConstructor -class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy -{ - private final TopicPartition topicPartition; - - @Override - public Mono handleMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - throw new ShardNotOwnedException(topicPartition); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java deleted file mode 100644 index 1fb4c47d..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.Message; -import reactor.core.publisher.Mono; - -import java.time.LocalDateTime; - - -interface MessageHandlingStrategy -{ - Mono handleMessage( - Message.MessageKey key, - LocalDateTime timestamp, - String text); -} -- 2.20.1