{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
+
+ Flux<ChatRoom> getChatRooms()
+ {
+ return Flux
+ .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i)))
+ .filter(shard -> isShardOwned[shard])
+ .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values()));
+ }
}
import de.juplo.kafka.chat.backend.domain.ChatRoom;
import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.ShardingStrategy;
+import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
@Slf4j
public class KafkaChatHome implements ChatHome
{
- private final ShardingStrategy shardingStrategy;
- private final ChatRoomChannel chatRoomChannel;
+ private final KafkaLikeShardingStrategy shardingStrategy;
private final ChatMessageChannel chatMessageChanel;
+ public KafkaChatHome(
+ int numPartitions,
+ ChatMessageChannel chatMessageChannel)
+ {
+ this.shardingStrategy = new KafkaLikeShardingStrategy(numPartitions);
+ this.chatMessageChanel = chatMessageChannel;
+ }
+
+
@Override
public Mono<ChatRoom> getChatRoom(UUID id)
{
@Override
public Flux<ChatRoom> getChatRooms()
{
- return chatRoomChannel.getChatRooms();
+ return chatMessageChanel.getChatRooms();
}
}
@Slf4j
public class KafkaChatRoomFactory implements ChatRoomFactory
{
- private final ChatRoomChannel chatRoomChannel;
+ private final ChatMessageChannel chatMessageChannel;
@Override
public Mono<ChatRoomInfo> createChatRoom(UUID id, String name)
{
log.info("Sending create-request for chat rooom: id={}, name={}");
- return chatRoomChannel.sendCreateChatRoomRequest(id, name);
+ return chatMessageChannel.sendCreateChatRoomRequest(id, name);
}
}