+++ /dev/null
-package de.juplo.kafka.chat.backend.persistence.inmemory;
-
-import de.juplo.kafka.chat.backend.domain.ChatRoom;
-import lombok.extern.slf4j.Slf4j;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-import java.util.*;
-import java.util.stream.IntStream;
-
-
-@Slf4j
-public class InMemoryChatHomeService
-{
- private final Map<UUID, ChatRoom>[] chatrooms;
-
-
- public InMemoryChatHomeService(
- int numShards,
- int[] ownedShards,
- Flux<ChatRoom> chatroomFlux)
- {
- log.debug("Creating InMemoryChatHomeService");
- this.chatrooms = new Map[numShards];
- Set<Integer> owned = Arrays
- .stream(ownedShards)
- .collect(
- () -> new HashSet<>(),
- (set, i) -> set.add(i),
- (a, b) -> a.addAll(b));
- for (int shard = 0; shard < numShards; shard++)
- {
- chatrooms[shard] = owned.contains(shard)
- ? new HashMap<>()
- : null;
- }
- chatroomFlux
- .filter(chatRoom ->
- {
- if (owned.contains(chatRoom.getShard()))
- {
- return true;
- }
- else
- {
- log.info("Ignoring not owned chat-room {}", chatRoom);
- return false;
- }
- })
- .toStream()
- .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
- }
-
- public void putChatRoom(ChatRoom chatRoom)
- {
- chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
- }
-
- public Mono<ChatRoom> getChatRoom(int shard, UUID id)
- {
- return Mono.justOrEmpty(chatrooms[shard].get(id));
- }
-
- public int[] getOwnedShards()
- {
- return IntStream
- .range(0, chatrooms.length)
- .filter(i -> chatrooms[i] != null)
- .toArray();
- }
-
- public Flux<ChatRoom> getChatRooms(int shard)
- {
- return Flux.fromStream(chatrooms[shard].values().stream());
- }
-}
@Slf4j
public class SimpleChatHome implements ChatHome
{
- private final int shard;
private final Map<UUID, ChatRoom> chatrooms;
+ public SimpleChatHome(Flux<ChatRoom> chatroomFlux)
+ {
+ this(chatroomFlux, null);
+ }
+
public SimpleChatHome(
- int shard,
+ Integer shard,
Flux<ChatRoom> chatroomFlux)
{
log.info("Created SimpleChatHome for shard {}", shard);
chatroomFlux
.filter(chatRoom ->
{
- if (shard > -1 && chatRoom.getShard() == shard)
+ if (shard == null && chatRoom.getShard() == shard)
{
return true;
}
})
.toStream()
.forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom));
- this.shard = shard;
}