projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
bd22fca
)
fix: GREEN - Correct handling of unknown chat-rooms in the Kafka-version
author
Kai Moritz
<kai@juplo.de>
Sun, 20 Aug 2023 10:20:55 +0000
(12:20 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 2 Sep 2023 16:10:33 +0000
(18:10 +0200)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
index
7f93ad8
..
7659d1e
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java
@@
-360,6
+360,14
@@
public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener
}
}
}
}
+ int[] getOwnedShards()
+ {
+ return IntStream
+ .range(0, numShards)
+ .filter(shard -> isShardOwned[shard])
+ .toArray();
+ }
+
Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
if (loadInProgress)
Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
if (loadInProgress)
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
index
6799040
..
ab72269
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java
@@
-2,6
+2,7
@@
package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatHome;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.UnknownChatroomException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.utils.Utils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.utils.Utils;
@@
-23,7
+24,9
@@
public class KafkaChatHome implements ChatHome
public Mono<ChatRoom> getChatRoom(UUID id)
{
int shard = selectShard(id);
public Mono<ChatRoom> getChatRoom(UUID id)
{
int shard = selectShard(id);
- return chatRoomChannel.getChatRoom(shard, id);
+ return chatRoomChannel
+ .getChatRoom(shard, id)
+ .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
}
int selectShard(UUID chatRoomId)
}
int selectShard(UUID chatRoomId)