Aktuelle Idee für die Kafka-Anbindung ===================================== - *Beobachtung:* Alle schreibenden Anfragen für Nachrichten müssen erst durch `ChatHomeService.getChatRoom(int, UUID)` den zuständigen `ChatRoom` ermitteln, bevor sie die Nachricht schreiben können. - D.h., das Locking, das während einem Rebalance nötig ist, kann *vollständig* in `KafkaChatHomeService` umgesetzt werden. - In `KafkaChatRoomService` muss *keinerlei* Kontrolle mehr erfolgen, ob der `ChatRoom` tatsächlich gerade in die Zuständigkeit der Instanz fällt, da die Anfragen *hier nie ankommen*, wenn die Instanz nicht zuständig ist, da sie dann bereits in `getChatRoom(int, UUID)` abgelehnt werden! - Die in der Domain-Klasse `ChatRoom` definierte Logik, für die Behandlung doppelter Nachrichten *ist vollständig valide*, da Anfragen für einen bestimmten `ChatRoom` dort (bei korrekt implementiertem Locking in `KafkaChatHomeService`) nur ankommen, wenn die Instanz *tatsächlich* für den `ChatRoom` zuständig ist. - D.h. insbesondere auch, dass die Antwort dort (also in dem `ChatRoom`) erst ankommen, wenn dieser *vollständig geladen* ist, so dass die lokale Kontrolle auf doppelte Nachrichten logisch gültig ist. - *Anforderung:* Wenn ein Rebalance aktiv ist, wird die Instanz gelockt. - Das Locking erfolg in `KafkaChatRoomService`, durch das alle Anfragen durchgreifen müssen, so dass hier *zentral alle Aktionen* auf einzelnen `ChatRoom`-Instanzen *unterbunden* werden können. - *Vereinfachung:* Wenn `KafkaChatRoomService` gelockt ist, wird für alle Zugriffe eine `ShardNotOwnedException` erzeugt. - Dadurch wird das Zustands-Handling *extrem vereinfacht*, da Anfragen, die *während* einem Rebalance auflaufen - *Lade-Modus - Initialisierung und Abschluss-Bedingung:* - Wenn durch einen Rebalance in den Lade-Modus gewechselt wird, muss die *tatsächliche* Offset-Position der zuletzt geschriebenen Nachrichten für die zugeordneten Partitionen ermittelt werden. - Anschließend wird ein Seek auf die Offset-Position 0 (später: auf die in den lokalen Daten gespeicherte Offset-Position) durchgeführt. - Der Lade-Modus ist abgeschlossen, wenn für alle zugeordneten Partitionen der zum Rebalance-Zeitpunkt ermittelte Offset der aktuellsten Nachrichten erreicht ist. - Wenn ein weiterer Rebalance erfolgt, während der Lade-Modus bereits aktiv ist, sollte es genügen, die Informationen über die zugeordneten Partitionen zu aktualisieren und die Aktuellen Offsets für diese neu zu ermitteln. - *Lade-Modus vs. Default-Modus:* - Nur während des Lade-Modus *liest* die `KafkaChatRoomServcie`-Instanz tatsächlich die Nachrichten aus den zugeordneten Partitionen. - Im Default-Modus *schreibt* sie die Nachrichten nur in die Partitionen und speichert sie lokal ab, sobald die *Bestätigung durch den `Producer`* erfolgt. - D.h. insbesondere, dass der `KafkaConsumer` im Default-Modus für alle zugeordneten Partitionen *pausiert* wird! - Damit die Offset-Positon nicht unnötig zurückfällt, sollte ggf. regelmäßig für alle zugeordneten Partitionen ein Seek auf die zuletzt vom Producer bestätigt geschriebenen Offsets durchgeführt werden. - *Beachte:_ Dies ist nicht nötig, wenn die Offsets eh in den lokal gespeicherten Daten gehalten und aus diesen wiederhergestellt werden! - *Umsetzungs-Details:* - Da die in dem Interface `ConsumerRebalanceListener` definierten Methoden in einem zeitkritischem Setting laufen, muss das eigentliche Laden der `ChatRoom`-Zustände separat erfolgen, so dass die Kontrolle schnell an den `KafkaConsumer` zurückgegeben werden kann. - Dafür muss der `KafkaChatRoomService` in einen speziellen Lade-Modus wechseln, der aktiv ist, bis die `ChatRoom`-Instanzen für alle durch den Rebalance zugeteilten Partitionen aus dem Log wiederhergestellt wurden. - Das Lock der `KafkaChatRoomService`-Instanz muss während dieser gesmaten Phase aufrecht erhalten werden: Es wird erst gelöst, wenn die Instanz in den normalen Modus zurückwechselt. - D.h. insbesondere auch, dass während dieser ganzen Phase _alle_ Anfragen mit `ShardNotOwnedException` abgelehnt werden! - Eine besondere Herausforderung sind *erneute* Rebalances, die Auftreten, *während* der `KafkaChatRoomService` sich noch in einem durch einen vorherigen Rebalance ausgelösten Lade-Modus befindet!