From b1e7c0913515b3c828b46b7a660fd877dfb4ec04 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 20 Aug 2023 12:20:55 +0200
Subject: [PATCH] fix: GREEN - Correct handling of unknown chat-rooms in the
 Kafka-version

---
 .../chat/backend/persistence/kafka/ChatRoomChannel.java   | 8 ++++++++
 .../chat/backend/persistence/kafka/KafkaChatHome.java     | 5 ++++-
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
index 7f93ad8b..7659d1e1 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
@@ -360,6 +360,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
     }
   }
 
+  int[] getOwnedShards()
+  {
+    return IntStream
+        .range(0, numShards)
+        .filter(shard -> isShardOwned[shard])
+        .toArray();
+  }
+
   Mono<ChatRoom> getChatRoom(int shard, UUID id)
   {
     if (loadInProgress)
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
index 67990409..ab72269d 100644
--- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
+++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka;
 
 import de.juplo.kafka.chat.backend.domain.ChatHome;
 import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.utils.Utils;
@@ -23,7 +24,9 @@ public class KafkaChatHome implements ChatHome
   public Mono<ChatRoom> getChatRoom(UUID id)
   {
     int shard = selectShard(id);
-    return chatRoomChannel.getChatRoom(shard, id);
+    return chatRoomChannel
+        .getChatRoom(shard, id)
+        .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
 
   int selectShard(UUID chatRoomId)
-- 
2.20.1