From: Kai Moritz Date: Fri, 24 Feb 2023 10:43:39 +0000 (+0100) Subject: WIP X-Git-Tag: wip-sharding~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0e89e548efc0c6d44e6634252a68d560eba62db2;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 4bcc542c..c61f8488 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @Autowired ChatBackendProperties properties; @Autowired - ChatHome[] chatHomes; + ChatHome chatHome; @Autowired StorageStrategy storageStrategy; @@ -32,8 +32,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - for (int shard = 0; shard < chatHomes.length; shard++) - storageStrategy.write(chatHomes[shard].getChatRooms(shard)); + storageStrategy.write(chatHome.getChatRooms()); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index c6445a03..557cf75f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.domain; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -7,18 +8,13 @@ import reactor.core.publisher.Mono; import java.util.*; +@RequiredArgsConstructor @Slf4j public class ChatHome { private final ChatHomeService service; - public ChatHome(ChatHomeService service) - { - log.info("Created ChatHome with ChatHomeService {}", service); - } - - public Mono getChatRoom(UUID id) { return service @@ -26,8 +22,8 @@ public class ChatHome .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } - public Flux getChatRooms(int shard) + public Flux getChatRooms() { - return service.getChatRooms(shard); + return service.getChatRooms(); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index a72b0c92..25a9bcf6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -95,6 +95,6 @@ public class InMemoryChatHomeService implements ChatHomeService { return Flux .fromIterable(ownedShards) - .flatMap(shard -> chatrooms[shard].values().stream()); + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); } }