From: Kai Moritz Date: Sat, 2 Sep 2023 17:34:29 +0000 (+0200) Subject: WIP X-Git-Tag: wip-inmemory-chat-home-service~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1a6e2af4b700d92efe20ce5099affc01413c6eaa;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java deleted file mode 100644 index 29f13127..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ /dev/null @@ -1,76 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; -import java.util.stream.IntStream; - - -@Slf4j -public class InMemoryChatHomeService -{ - private final Map[] chatrooms; - - - public InMemoryChatHomeService( - int numShards, - int[] ownedShards, - Flux chatroomFlux) - { - log.debug("Creating InMemoryChatHomeService"); - this.chatrooms = new Map[numShards]; - Set owned = Arrays - .stream(ownedShards) - .collect( - () -> new HashSet<>(), - (set, i) -> set.add(i), - (a, b) -> a.addAll(b)); - for (int shard = 0; shard < numShards; shard++) - { - chatrooms[shard] = owned.contains(shard) - ? new HashMap<>() - : null; - } - chatroomFlux - .filter(chatRoom -> - { - if (owned.contains(chatRoom.getShard())) - { - return true; - } - else - { - log.info("Ignoring not owned chat-room {}", chatRoom); - return false; - } - }) - .toStream() - .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); - } - - public void putChatRoom(ChatRoom chatRoom) - { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - } - - public Mono getChatRoom(int shard, UUID id) - { - return Mono.justOrEmpty(chatrooms[shard].get(id)); - } - - public int[] getOwnedShards() - { - return IntStream - .range(0, chatrooms.length) - .filter(i -> chatrooms[i] != null) - .toArray(); - } - - public Flux getChatRooms(int shard) - { - return Flux.fromStream(chatrooms[shard].values().stream()); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 375ed721..00e0c6fd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -26,9 +26,9 @@ public class InMemoryServicesConfiguration name = "sharding-strategy", havingValue = "none", matchIfMissing = true) - ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService) + ChatHome noneShardingChatHome() { - return new SimpleChatHome(chatHomeService); + return new SimpleChatHome(); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index 3048aa50..2987dd43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -13,13 +13,17 @@ import java.util.*; @Slf4j public class SimpleChatHome implements ChatHome { - private final int shard; private final Map chatrooms; + public SimpleChatHome(Flux chatroomFlux) + { + this(chatroomFlux, null); + } + public SimpleChatHome( - int shard, + Integer shard, Flux chatroomFlux) { log.info("Created SimpleChatHome for shard {}", shard); @@ -28,7 +32,7 @@ public class SimpleChatHome implements ChatHome chatroomFlux .filter(chatRoom -> { - if (shard > -1 && chatRoom.getShard() == shard) + if (shard == null && chatRoom.getShard() == shard) { return true; } @@ -43,7 +47,6 @@ public class SimpleChatHome implements ChatHome }) .toStream() .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); - this.shard = shard; }