From: Kai Moritz Date: Fri, 18 Aug 2023 14:10:45 +0000 (+0200) Subject: NG X-Git-Tag: rebase--2023-08-18-abends~8 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=cce6ad12b547bcd2c371f2d97db7c617661e921a;p=demos%2Fkafka%2Fchat NG --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index b473a418..7f885e6f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -362,4 +362,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { return Mono.justOrEmpty(chatrooms[shard].get(id)); } + + Flux getChatRooms() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 2bccc3ad..71893624 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -4,6 +4,7 @@ import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -16,11 +17,19 @@ import java.util.*; @Slf4j public class KafkaChatHome implements ChatHome { - private final ShardingStrategy shardingStrategy; - private final ChatRoomChannel chatRoomChannel; + private final KafkaLikeShardingStrategy shardingStrategy; private final ChatMessageChannel chatMessageChanel; + public KafkaChatHome( + int numPartitions, + ChatMessageChannel chatMessageChannel) + { + this.shardingStrategy = new KafkaLikeShardingStrategy(numPartitions); + this.chatMessageChanel = chatMessageChannel; + } + + @Override public Mono getChatRoom(UUID id) { @@ -38,6 +47,6 @@ public class KafkaChatHome implements ChatHome @Override public Flux getChatRooms() { - return chatRoomChannel.getChatRooms(); + return chatMessageChanel.getChatRooms(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java index 825f16eb..c46529d8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -13,12 +13,12 @@ import java.util.UUID; @Slf4j public class KafkaChatRoomFactory implements ChatRoomFactory { - private final ChatRoomChannel chatRoomChannel; + private final ChatMessageChannel chatMessageChannel; @Override public Mono createChatRoom(UUID id, String name) { log.info("Sending create-request for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); + return chatMessageChannel.sendCreateChatRoomRequest(id, name); } }