From: Kai Moritz Date: Fri, 17 Feb 2023 23:21:50 +0000 (+0100) Subject: refactor: Refined the creation of new `ChatRoom`s X-Git-Tag: wip-sharding~14 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1d4b90c15b1571bce48389e2c34e7b15c1697b89;p=demos%2Fkafka%2Fchat refactor: Refined the creation of new `ChatRoom`s - Dropped `ChatHomeService.putChatRoom(ChatRoom)`. - `ChatRoomFactory.create(UUID, String)` returns `ChatRoomInfo`. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 36bec48a..4db77ee2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -23,21 +23,20 @@ public class ChatBackendController @PostMapping("create") - public Mono create(@RequestBody String name) + public Mono create(@RequestBody String name) { UUID chatRoomId = UUID.randomUUID(); return factory .createChatRoom(chatRoomId, name) - .flatMap(chatRoom -> chatHome.putChatRoom(chatRoom)) - .map(ChatRoomTo::from); + .map(ChatRoomInfoTo::from); } @GetMapping("list") - public Flux list() + public Flux list() { return chatHome .getChatRooms() - .map(chatroom -> ChatRoomTo.from(chatroom)); + .map(chatroom -> ChatRoomInfoTo.from(chatroom)); } @GetMapping("{chatroomId}/list") @@ -51,11 +50,11 @@ public class ChatBackendController } @GetMapping("{chatroomId}") - public Mono get(@PathVariable UUID chatroomId) + public Mono get(@PathVariable UUID chatroomId) { return chatHome .getChatRoom(chatroomId) - .map(chatroom -> ChatRoomTo.from(chatroom)); + .map(chatroom -> ChatRoomInfoTo.from(chatroom)); } @PutMapping("{chatroomId}/{username}/{messageId}") diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java new file mode 100644 index 00000000..212fb8d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java @@ -0,0 +1,24 @@ +package de.juplo.kafka.chat.backend.api; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + +@Data +public class ChatRoomInfoTo +{ + private UUID id; + private String name; + private int shard; + + + public static ChatRoomInfoTo from(ChatRoomInfo info) + { + ChatRoomInfoTo to = new ChatRoomInfoTo(); + to.id = info.getId(); + to.name = info.getName(); + to.shard = info.getShard(); + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java deleted file mode 100644 index e997e4bb..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka.chat.backend.api; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import lombok.Data; - -import java.util.UUID; - -@Data -public class ChatRoomTo -{ - private UUID id; - private String name; - private int shard; - - - public static ChatRoomTo from(ChatRoom chatroom) - { - ChatRoomTo to = new ChatRoomTo(); - to.id = chatroom.getId(); - to.name = chatroom.getName(); - to.shard = chatroom.getShard(); - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 99439d36..6091c0c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,8 +8,6 @@ import java.util.UUID; public interface ChatHome { - Mono putChatRoom(ChatRoom chatRoom); - Mono getChatRoom(UUID id); Flux getChatRooms(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 7f13283a..19ff4aa4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,7 +8,6 @@ import java.util.UUID; public interface ChatHomeService { - Mono putChatRoom(ChatRoom chatRoom); Mono getChatRoom(int shard, UUID id); Flux getChatRooms(int shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index da5eba2a..cffc0ad0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -1,8 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -17,17 +14,9 @@ import java.util.regex.Pattern; @Slf4j -@EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "name" }) -public class ChatRoom +public class ChatRoom extends ChatRoomInfo { public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); - @Getter - private final UUID id; - @Getter - private final String name; - @Getter - private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -42,10 +31,8 @@ public class ChatRoom ChatRoomService service, int bufferSize) { + super(id, name, shard); log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); - this.id = id; - this.name = name; - this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java new file mode 100644 index 00000000..6d88be95 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.UUID; + + +@RequiredArgsConstructor +@EqualsAndHashCode(of = { "id" }) +@ToString(of = { "id", "name", "shard" }) +public class ChatRoomInfo +{ + @Getter + private final UUID id; + @Getter + private final String name; + @Getter + private final int shard; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index 3023f782..4b8c7f16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -37,12 +37,6 @@ public class ShardedChatHome implements ChatHome } - @Override - public Mono putChatRoom(ChatRoom chatRoom) - { - return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom); - } - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java index 46802c69..11542edd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java @@ -27,12 +27,6 @@ public class SimpleChatHome implements ChatHome } - @Override - public Mono putChatRoom(ChatRoom chatRoom) - { - return service.putChatRoom(chatRoom); - } - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index fd54d34c..8f262a0b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -51,11 +51,9 @@ public class InMemoryChatHomeService implements ChatHomeService .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } - @Override - public Mono putChatRoom(ChatRoom chatRoom) + public void putChatRoom(ChatRoom chatRoom) { chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - return Mono.just(chatRoom); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 7fff359c..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -1,9 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -17,17 +14,20 @@ import java.util.UUID; @Slf4j public class InMemoryChatRoomFactory implements ChatRoomFactory { + private final InMemoryChatHomeService chatHomeService; private final ShardingStrategy shardingStrategy; private final Clock clock; private final int bufferSize; @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatHomeService.putChatRoom(chatRoom); + return Mono.just(chatRoom); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 96ef05c2..de504485 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -79,11 +79,13 @@ public class InMemoryServicesConfiguration @Bean InMemoryChatRoomFactory chatRoomFactory( + InMemoryChatHomeService service, ShardingStrategy strategy, Clock clock, ChatBackendProperties properties) { return new InMemoryChatRoomFactory( + service, strategy, clock, properties.getChatroomBufferSize()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 1e3e5eef..f0ee1dfb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatRoomTo; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; @@ -82,9 +82,9 @@ public class FilesStorageStrategy implements StorageStrategy { try { - ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); - generator.writeObject(chatroomTo); - writeMessages(chatroomTo, chatroom.getMessages()); + ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom); + generator.writeObject(infoTo); + writeMessages(infoTo, chatroom.getMessages()); } catch (IOException e) { @@ -101,28 +101,28 @@ public class FilesStorageStrategy implements StorageStrategy @Override public Flux read() { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); + JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> + .map(infoTo -> { - UUID chatRoomId = chatRoomTo.getId(); + UUID chatRoomId = infoTo.getId(); int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), + infoTo.getId(), + infoTo.getName(), shard, clock, - factory.create(readMessages(chatRoomTo)), + factory.create(readMessages(infoTo)), bufferSize); }); } - public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) + public void writeMessages(ChatRoomInfoTo infoTo, Flux messageFlux) { - Path path = chatroomPath(chatroomTo); - log.info("Writing messages for {} to {}", chatroomTo, path); + Path path = chatroomPath(infoTo); + log.info("Writing messages for {} to {}", infoTo, path); try { Files.createDirectories(storagePath); @@ -177,11 +177,11 @@ public class FilesStorageStrategy implements StorageStrategy } } - public Flux readMessages(ChatRoomTo chatroomTo) + public Flux readMessages(ChatRoomInfoTo infoTo) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(infoTo), mapper, type)) .log() .map(MessageTo::toMessage); } @@ -191,8 +191,8 @@ public class FilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatRoomTo chatroomTo) + Path chatroomPath(ChatRoomInfoTo infoTo) { - return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); + return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json")); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index 832ebd99..dd76324c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -27,6 +27,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI getStorageStrategy().read()); InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatHomeService, chatRoomId -> 0, clock, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index d5e02b83..3ce527eb 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -40,8 +40,9 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoom chatroom = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); - chathome.putChatRoom(chatroom); + ChatRoomInfo info = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); + log.debug("Created chat-room {}", info); + ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); @@ -71,16 +72,18 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoom chatroomA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); - chathome.putChatRoom(chatroomA); + ChatRoomInfo infoA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + log.debug("Created chat-room {}", infoA); + ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); - ChatRoom chatroomB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); - chathome.putChatRoom(chatroomB); + ChatRoomInfo infoB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + log.debug("Created chat-room {}", infoB); + ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block();