From 6fdeed8a072823b9e81501cfe91a51b8bf75f5a0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Sep 2023 11:15:19 +0200 Subject: [PATCH] WIP --- .../backend/api/ChatBackendController.java | 4 +- .../kafka/chat/backend/domain/ChatHome.java | 2 + .../kafka/chat/backend/domain/ChatRoom.java | 2 +- .../chat/backend/domain/ChatRoomFactory.java | 11 ----- .../chat/backend/domain/ChatRoomInfo.java | 2 +- .../inmemory/InMemoryChatRoomFactory.java | 32 -------------- .../persistence/inmemory/ShardedChatHome.java | 14 ++++-- .../persistence/inmemory/SimpleChatHome.java | 44 ++++++++++++++----- .../persistence/kafka/KafkaChatHome.java | 9 ++++ .../kafka/KafkaChatRoomFactory.java | 24 ---------- .../kafka/KafkaServicesConfiguration.java | 6 --- 11 files changed, 56 insertions(+), 94 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 339451a8..f41f45f6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.RequiredArgsConstructor; import org.springframework.http.codec.ServerSentEvent; @@ -18,7 +17,6 @@ import java.util.UUID; public class ChatBackendController { private final ChatHome chatHome; - private final ChatRoomFactory factory; private final StorageStrategy storageStrategy; @@ -26,7 +24,7 @@ public class ChatBackendController public Mono create(@RequestBody String name) { UUID chatRoomId = UUID.randomUUID(); - return factory + return chatHome .createChatRoom(chatRoomId, name) .map(ChatRoomInfoTo::from); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..e4d92dbb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,6 +8,8 @@ import java.util.UUID; public interface ChatHome { + Mono createChatRoom(UUID id, String name); + Mono getChatRoom(UUID id); Flux getChatRooms(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index b9463095..c66b887d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -26,7 +26,7 @@ public class ChatRoom extends ChatRoomInfo public ChatRoom( UUID id, String name, - int shard, + Integer shard, Clock clock, ChatRoomService service, int bufferSize) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java deleted file mode 100644 index 603795d9..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import reactor.core.publisher.Mono; - -import java.util.UUID; - - -public interface ChatRoomFactory -{ - Mono createChatRoom(UUID id, String name); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java index 6d88be95..33c522d1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -18,5 +18,5 @@ public class ChatRoomInfo @Getter private final String name; @Getter - private final int shard; + private final Integer shard; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java deleted file mode 100644 index 855c401e..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; - -import de.juplo.kafka.chat.backend.domain.*; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.time.Clock; -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class InMemoryChatRoomFactory implements ChatRoomFactory -{ - private final ShardingStrategy shardingStrategy; - private final Clock clock; - private final int bufferSize; - - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Creating ChatRoom with buffer-size {}", bufferSize); - int shard = shardingStrategy.selectShard(id); - ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatHomeService.putChatRoom(chatRoom); - return Mono.just(chatRoom); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java index b8bc4b8e..c6aff1e3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java @@ -1,9 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; -import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -41,6 +38,15 @@ public class ShardedChatHome implements ChatHome } + @Override + public Mono createChatRoom(UUID id, String name) + { + int shard = shardingStrategy.selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard].createChatRoom(id, name); + } + @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index 2987dd43..c772d7eb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,34 +1,42 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.time.Clock; import java.util.*; @Slf4j public class SimpleChatHome implements ChatHome { - private final Map chatrooms; + private final Integer shard; + private final Map chatRooms; + private final Clock clock; + private final int bufferSize; - public SimpleChatHome(Flux chatroomFlux) + public SimpleChatHome( + Flux chatroomFlux, + Clock clock, + int bufferSize) { - this(chatroomFlux, null); + this(null, chatroomFlux, clock, bufferSize); } public SimpleChatHome( Integer shard, - Flux chatroomFlux) + Flux chatroomFlux, + Clock clock, + int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); - - this.chatrooms = new HashMap<>(); +; + this.shard = shard; + this.chatRooms = new HashMap<>(); chatroomFlux .filter(chatRoom -> { @@ -46,21 +54,33 @@ public class SimpleChatHome implements ChatHome } }) .toStream() - .forEach(chatroom -> chatrooms.put(chatroom.getId(), chatroom)); + .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom)); + this.clock = clock; + this.bufferSize = bufferSize; } + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Creating ChatRoom with buffer-size {}", bufferSize); + ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatRooms.put(id, chatRoom); + return Mono.just(chatRoom); + } + @Override public Mono getChatRoom(UUID id) { return Mono - .justOrEmpty(chatrooms.get(id)) + .justOrEmpty(chatRooms.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override public Flux getChatRooms() { - return Flux.fromIterable(chatrooms.values()); + return Flux.fromIterable(chatRooms.values()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 07fb8858..06228396 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -20,6 +21,14 @@ public class KafkaChatHome implements ChatHome private final ChatRoomChannel chatRoomChannel; + + @Override + public Mono createChatRoom(UUID id, String name) + { + log.info("Sending create-command for chat rooom: id={}, name={}"); + return chatRoomChannel.sendCreateChatRoomRequest(id, name); + } + @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java deleted file mode 100644 index 6a1dc78a..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.kafka; - -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Mono; - -import java.util.UUID; - - -@RequiredArgsConstructor -@Slf4j -public class KafkaChatRoomFactory implements ChatRoomFactory -{ - private final ChatRoomChannel chatRoomChannel; - - @Override - public Mono createChatRoom(UUID id, String name) - { - log.info("Sending create-command for chat rooom: id={}, name={}"); - return chatRoomChannel.sendCreateChatRoomRequest(id, name); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index 1cd41b53..df4faed8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -43,12 +43,6 @@ public class KafkaServicesConfiguration chatRoomChannel); } - @Bean - KafkaChatRoomFactory chatRoomFactory(ChatRoomChannel chatRoomChannel) - { - return new KafkaChatRoomFactory(chatRoomChannel); - } - @Bean ChatRoomChannel chatRoomChannel( ChatBackendProperties properties, -- 2.20.1