From d0d32b29dbc7a4e0afa737f97126b394ce1a24ee Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Sep 2023 21:31:15 +0200 Subject: [PATCH] WIP:refactor: Renamed `ChatRoom` into `ChatRoomData` - Aligned Code --- .../chat/backend/ChatBackendApplication.java | 2 +- .../backend/api/ChatBackendController.java | 59 ++++++++--------- .../kafka/chat/backend/domain/ChatHome.java | 8 ++- .../chat/backend/domain/ChatRoomData.java | 6 +- .../backend/persistence/StorageStrategy.java | 7 +- .../InMemoryServicesConfiguration.java | 4 +- .../persistence/inmemory/ShardedChatHome.java | 33 ++++++++-- .../persistence/inmemory/SimpleChatHome.java | 64 ++++++++++++++----- .../persistence/kafka/ChatRoomChannel.java | 5 +- .../persistence/kafka/KafkaChatHome.java | 28 ++++++-- .../storage/files/FilesStorageStrategy.java | 9 +-- .../storage/mongodb/ChatRoomTo.java | 9 +-- .../mongodb/MongoDbStorageStrategy.java | 9 +-- .../NoStorageStorageConfiguration.java | 14 +++- .../api/ChatBackendControllerTest.java | 36 +++++------ .../chat/backend/domain/ChatHomeTest.java | 4 +- .../domain/ChatHomeWithShardsTest.java | 2 +- .../chat/backend/domain/ChatRoomDataTest.java | 16 ----- .../AbstractInMemoryStorageIT.java | 2 +- .../AbstractStorageStrategyIT.java | 44 ++++++------- .../inmemory/ShardedChatHomeTest.java | 2 +- .../inmemory/SimpleChatHomeTest.java | 2 +- 22 files changed, 214 insertions(+), 151 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 8e1ff9e5..cc924d79 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -33,7 +33,7 @@ public class ChatBackendApplication implements WebFluxConfigurer public void onExit() { for (int shard = 0; shard < chatHomes.length; shard++) - storageStrategy.write(chatHomes[shard].getChatRooms()); + storageStrategy.writeChatRoomData(chatHomes[shard].getChatRoomData()); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index a400119f..ac77debf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -33,49 +33,49 @@ public class ChatBackendController public Flux list() { return chatHome - .getChatRooms() - .map(chatroom -> ChatRoomInfoTo.from(chatroom)); + .getChatRoomInfo() + .map(chatroomInfo -> ChatRoomInfoTo.from(chatroomInfo)); } - @GetMapping("{chatroomId}/list") - public Flux list(@PathVariable UUID chatroomId) + @GetMapping("{chatRoomId}/list") + public Flux list(@PathVariable UUID chatRoomId) { return chatHome - .getChatRoom(chatroomId) - .flatMapMany(chatroom -> chatroom + .getChatRoomData(chatRoomId) + .flatMapMany(chatRoomData -> chatRoomData .getMessages() .map(MessageTo::from)); } - @GetMapping("{chatroomId}") - public Mono get(@PathVariable UUID chatroomId) + @GetMapping("{chatRoomId}") + public Mono get(@PathVariable UUID chatRoomId) { return chatHome - .getChatRoom(chatroomId) - .map(chatroom -> ChatRoomInfoTo.from(chatroom)); + .getChatRoomInfo(chatRoomId) + .map(chatRoomInfo -> ChatRoomInfoTo.from(chatRoomInfo)); } - @PutMapping("{chatroomId}/{username}/{messageId}") + @PutMapping("{chatRoomId}/{username}/{messageId}") public Mono put( - @PathVariable UUID chatroomId, + @PathVariable UUID chatRoomId, @PathVariable String username, @PathVariable Long messageId, @RequestBody String text) { return chatHome - .getChatRoom(chatroomId) - .flatMap(chatroom -> put(chatroom, username, messageId, text)); + .getChatRoomData(chatRoomId) + .flatMap(chatRoomData -> put(chatRoomData, username, messageId, text)); } private Mono put( - ChatRoomData chatroom, + ChatRoomData chatRoomData, String username, Long messageId, String text) { return - chatroom + chatRoomData .addMessage( messageId, username, @@ -83,40 +83,40 @@ public class ChatBackendController .map(message -> MessageTo.from(message)); } - @GetMapping("{chatroomId}/{username}/{messageId}") + @GetMapping("{chatRoomId}/{username}/{messageId}") public Mono get( - @PathVariable UUID chatroomId, + @PathVariable UUID chatRoomId, @PathVariable String username, @PathVariable Long messageId) { return chatHome - .getChatRoom(chatroomId) - .flatMap(chatroom -> get(chatroom, username, messageId)); + .getChatRoomData(chatRoomId) + .flatMap(chatRoomData -> get(chatRoomData, username, messageId)); } private Mono get( - ChatRoomData chatroom, + ChatRoomData chatRoomData, String username, Long messageId) { return - chatroom + chatRoomData .getMessage(username, messageId) .map(message -> MessageTo.from(message)); } - @GetMapping(path = "{chatroomId}/listen") - public Flux> listen(@PathVariable UUID chatroomId) + @GetMapping(path = "{chatRoomId}/listen") + public Flux> listen(@PathVariable UUID chatRoomId) { return chatHome - .getChatRoom(chatroomId) - .flatMapMany(chatroom -> listen(chatroom)); + .getChatRoomData(chatRoomId) + .flatMapMany(chatRoomData -> listen(chatRoomData)); } - private Flux> listen(ChatRoomData chatroom) + private Flux> listen(ChatRoomData chatRoomData) { - return chatroom + return chatRoomData .listen() .log() .map(message -> MessageTo.from(message)) @@ -131,6 +131,7 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.write(chatHome.getChatRooms()); + storageStrategy.writeChatRoomInfo(chatHome.getChatRoomInfo()); + storageStrategy.writeChatRoomData(chatHome.getChatRoomData()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 7b97d553..a3442ae7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -10,7 +10,11 @@ public interface ChatHome { Mono createChatRoom(UUID id, String name); - Mono getChatRoom(UUID id); + Mono getChatRoomInfo(UUID id); - Flux getChatRooms(); + Flux getChatRoomInfo(); + + Mono getChatRoomData(UUID id); + + Flux getChatRoomData(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index a5667c18..cf44802f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -17,20 +17,20 @@ public class ChatRoomData { public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); - private final Clock clock; private final ChatRoomService service; + private final Clock clock; private final int bufferSize; private Sinks.Many sink; public ChatRoomData( - Clock clock, ChatRoomService service, + Clock clock, int bufferSize) { log.info("Created ChatRoom with buffer-size {}", bufferSize); - this.clock = clock; this.service = service; + this.clock = clock; this.bufferSize = bufferSize; // @RequiredArgsConstructor unfortunately not possible, because // the `bufferSize` is not set, if `createSink()` is called diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java index 96d1eb7b..9997b94e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -1,11 +1,14 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import reactor.core.publisher.Flux; public interface StorageStrategy { - void write(Flux chatroomFlux); - Flux read(); + void writeChatRoomInfo(Flux chatRoomInfoFlux); + Flux readChatRoomInfo(); + void writeChatRoomData(Flux chatRoomDataFlux); + Flux readChatRoomData(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 106c7369..dd8f7d22 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -31,7 +31,7 @@ public class InMemoryServicesConfiguration Clock clock) { return new SimpleChatHome( - storageStrategy.read(), + storageStrategy.readChatRoomData(), clock, properties.getChatroomBufferSize()); } @@ -52,7 +52,7 @@ public class InMemoryServicesConfiguration .of(properties.getInmemory().getOwnedShards()) .forEach(shard -> chatHomes[shard] = new SimpleChatHome( shard, - storageStrategy.read(), + storageStrategy.readChatRoomData(), clock, properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java index 653c64aa..2c9e166e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java @@ -48,13 +48,13 @@ public class ShardedChatHome implements ChatHome } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { int shard = selectShard(id); return chatHomes[shard] == null ? Mono.error(new ShardNotOwnedException(shard)) : chatHomes[shard] - .getChatRoom(id) + .getChatRoomInfo(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException ? new UnknownChatroomException( id, @@ -64,13 +64,38 @@ public class ShardedChatHome implements ChatHome } @Override - public Flux getChatRooms() + public Flux getChatRoomInfo() { return Flux .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRooms()); + .flatMap(shard -> chatHomes[shard].getChatRoomInfo()); } + @Override + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard] + .getChatRoomData(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); + } + + @Override + public Flux getChatRoomData() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRoomData()); + } + + private int selectShard(UUID chatroomId) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index 9112fb92..f0067fa6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -13,34 +13,43 @@ import java.util.*; public class SimpleChatHome implements ChatHome { private final Integer shard; - private final Map chatRooms; + private final Map chatRoomInfo; + private final Map chatRoomData; private final Clock clock; private final int bufferSize; public SimpleChatHome( - Flux chatroomFlux, + Flux chatRoomInfoFlux, + ChatRoomService chatRoomService, Clock clock, int bufferSize) { - this(null, chatroomFlux, clock, bufferSize); + this( + null, + chatRoomInfoFlux, + chatRoomService, + clock, + bufferSize); } public SimpleChatHome( Integer shard, - Flux chatroomFlux, + Flux chatRoomInfoFlux, + ChatRoomService chatRoomService, Clock clock, int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); ; this.shard = shard; - this.chatRooms = new HashMap<>(); - chatroomFlux - .filter(chatRoom -> + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + chatRoomInfoFlux + .filter(info -> { - if (shard == null || chatRoom.getShard() == shard) + if (shard == null || info.getShard() == shard) { return true; } @@ -49,12 +58,18 @@ public class SimpleChatHome implements ChatHome log.info( "SimpleChatHome for shard {} ignores not owned chat-room {}", shard, - chatRoom); + info); return false; } }) .toStream() - .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom)); + .forEach(info -> + { + chatRoomInfo.put(info.getId(), info); + chatRoomData.put( + info.getId(), + new ChatRoomData(chatRoomService, clock, bufferSize)); + }); this.clock = clock; this.bufferSize = bufferSize; } @@ -65,22 +80,37 @@ public class SimpleChatHome implements ChatHome { log.info("Creating ChatRoom with buffer-size {}", bufferSize); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoomData chatRoomData = new ChatRoomData(id, name, shard, clock, service, bufferSize); - chatRooms.put(id, chatRoomData); - return Mono.just(chatRoomData); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + ChatRoomData chatRoomData = new ChatRoomData(service, clock, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); + } + + @Override + public Mono getChatRoomInfo(UUID id) + { + return Mono + .justOrEmpty(chatRoomInfo.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + @Override + public Flux getChatRoomInfo() + { + return Flux.fromIterable(chatRoomInfo.values()); } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomData(UUID id) { return Mono - .justOrEmpty(chatRooms.get(id)) + .justOrEmpty(chatRoomData.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override - public Flux getChatRooms() + public Flux getChatRoomData() { - return Flux.fromIterable(chatRooms.values()); + return Flux.fromIterable(chatRoomData.values()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 6ce85d1b..b631ed51 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -282,9 +282,6 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); ChatRoomData chatRoomData = new ChatRoomData( - chatRoomId, - createChatRoomRequestTo.getName(), - partition, clock, service, bufferSize); @@ -299,7 +296,7 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener int shard = chatRoomInfo.getShard(); log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); KafkaChatRoomService service = new KafkaChatRoomService(this, id); - ChatRoomData chatRoomData = new ChatRoomData(id, name, shard, clock, service, bufferSize); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); putChatRoom(chatRoomData); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 68cad73e..cea178b2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -30,7 +30,7 @@ public class KafkaChatHome implements ChatHome } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { int shard = selectShard(id); return chatRoomChannel @@ -41,15 +41,33 @@ public class KafkaChatHome implements ChatHome chatRoomChannel.getOwnedShards()))); } - int selectShard(UUID chatRoomId) + @Override + public Flux getChatRoomInfo() { - byte[] serializedKey = chatRoomId.toString().getBytes(); - return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + return chatRoomChannel.getChatRooms(); } @Override - public Flux getChatRooms() + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatRoomChannel + .getChatRoom(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()))); + } + + @Override + public Flux getChatRoomData() { return chatRoomChannel.getChatRooms(); } + + int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index fb6e3446..860d9ffb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -39,7 +39,7 @@ public class FilesStorageStrategy implements StorageStrategy @Override - public void write(Flux chatroomFlux) + public void writeChatRoomData(Flux chatRoomDataFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -52,7 +52,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatroomFlux + chatRoomDataFlux .log() .doFirst(() -> { @@ -99,7 +99,7 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public Flux read() + public Flux readChatRoomData() { JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux @@ -110,9 +110,6 @@ public class FilesStorageStrategy implements StorageStrategy UUID chatRoomId = infoTo.getId(); int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoomData( - infoTo.getId(), - infoTo.getName(), - shard, clock, factory.create(readMessages(infoTo)), bufferSize); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java index 6b5ea878..f3fae327 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import lombok.*; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; @@ -24,12 +25,12 @@ public class ChatRoomTo public static ChatRoomTo from( ChatRoomInfo chatRoomInfo, - ChatRoomData chatroom) + ChatRoomData chatRoomData) { return new ChatRoomTo( - chatroom.getId().toString(), - chatroom.getName(), - chatroom + chatRoomInfo.getId().toString(), + chatRoomInfo.getName(), + chatRoomData .getMessages() .map(MessageTo::from) .collectList() diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index b9c7fb23..7952a2b2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -24,15 +24,15 @@ public class MongoDbStorageStrategy implements StorageStrategy @Override - public void write(Flux chatroomFlux) + public void writeChatRoomData(Flux chatRoomDataFlux) { - chatroomFlux + chatRoomDataFlux .map(ChatRoomTo::from) .subscribe(chatroomTo -> repository.save(chatroomTo)); } @Override - public Flux read() + public Flux readChatRoomData() { return Flux .fromIterable(repository.findAll()) @@ -41,9 +41,6 @@ public class MongoDbStorageStrategy implements StorageStrategy UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoomData( - chatRoomId, - chatRoomTo.getName(), - shard, clock, factory.create( Flux diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java index dd2223ca..84ac8deb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.nostorage; import de.juplo.kafka.chat.backend.domain.ChatRoomData; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -29,10 +30,19 @@ public class NoStorageStorageConfiguration return new StorageStrategy() { @Override - public void write(Flux chatroomFlux) {} + public void writeChatRoomInfo(Flux chatRoomInfoFlux) {} @Override - public Flux read() + public Flux readChatRoomInfo() + { + return Flux.empty(); + } + + @Override + public void writeChatRoomData(Flux chatRoomDataFlux) {} + + @Override + public Flux readChatRoomData() { return Flux.empty(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 63ab7d6f..8736b829 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -42,7 +42,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -62,7 +62,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -83,7 +83,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -109,7 +109,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -132,7 +132,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -171,12 +171,10 @@ public class ChatBackendControllerTest String textExistingMessage = "Existing"; String textMutatedMessage = "Mutated!"; ChatRoomData chatRoomData = new ChatRoomData( - chatroomId, - "Test-ChatRoom", - 0, Clock.systemDefaultZone(), - chatRoomService, 8); - when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoomData)); + chatRoomService, + 8); + when(chatHome.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData)); Message existingMessage = new Message( key, serialNumberExistingMessage, @@ -223,12 +221,10 @@ public class ChatBackendControllerTest Message.MessageKey key = Message.MessageKey.of(user, messageId); String textMessage = "Hallo Welt"; ChatRoomData chatRoomData = new ChatRoomData( - chatroomId, - "Test-ChatRoom", - 0, Clock.systemDefaultZone(), - chatRoomService, 8); - when(chatHome.getChatRoom(any(UUID.class))) + chatRoomService, + 8); + when(chatHome.getChatRoomData(any(UUID.class))) .thenReturn(Mono.just(chatRoomData)); when(chatRoomService.getMessage(any(Message.MessageKey.class))) .thenReturn(Mono.empty()); @@ -262,7 +258,7 @@ public class ChatBackendControllerTest // Given UUID chatroomId = UUID.randomUUID(); int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -282,7 +278,7 @@ public class ChatBackendControllerTest // Given UUID chatroomId = UUID.randomUUID(); int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -304,7 +300,7 @@ public class ChatBackendControllerTest String username = "foo"; Long messageId = 66l; int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -331,7 +327,7 @@ public class ChatBackendControllerTest String username = "foo"; Long messageId = 66l; int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -355,7 +351,7 @@ public class ChatBackendControllerTest // Given UUID chatroomId = UUID.randomUUID(); int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java index ad7fe95d..5b50314f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -30,7 +30,7 @@ public abstract class ChatHomeTest // When Mono mono = Mono - .defer(() -> chatHome.getChatRoom(chatRoomId)) + .defer(() -> chatHome.getChatRoomData(chatRoomId)) .log("testGetExistingChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) @@ -49,7 +49,7 @@ public abstract class ChatHomeTest // When Mono mono = Mono - .defer(() -> chatHome.getChatRoom(chatRoomId)) + .defer(() -> chatHome.getChatRoomData(chatRoomId)) .log("testGetNonExistentChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java index 67754566..c65908c4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java @@ -27,7 +27,7 @@ public abstract class ChatHomeWithShardsTest extends ChatHomeTest // When Mono mono = Mono - .defer(() -> chatHome.getChatRoom(chatRoomId)) + .defer(() -> chatHome.getChatRoomData(chatRoomId)) .log("testGetChatroomForNotOwnedShard") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java index 171a9da5..790482fe 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java @@ -8,7 +8,6 @@ import java.time.Clock; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.UUID; import static org.mockito.Mockito.*; import static pl.rzrz.assertj.reactor.Assertions.assertThat; @@ -25,9 +24,6 @@ public class ChatRoomDataTest Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); ChatRoomData chatRoomData = new ChatRoomData( - UUID.randomUUID(), - "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -52,9 +48,6 @@ public class ChatRoomDataTest Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); ChatRoomData chatRoomData = new ChatRoomData( - UUID.randomUUID(), - "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -76,9 +69,6 @@ public class ChatRoomDataTest Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); ChatRoomData chatRoomData = new ChatRoomData( - UUID.randomUUID(), - "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -106,9 +96,6 @@ public class ChatRoomDataTest Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); ChatRoomData chatRoomData = new ChatRoomData( - UUID.randomUUID(), - "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -136,9 +123,6 @@ public class ChatRoomDataTest Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); ChatRoomData chatRoomData = new ChatRoomData( - UUID.randomUUID(), - "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index 62dc08ad..b5d0cd42 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -22,7 +22,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI int bufferSize = 8; SimpleChatHome simpleChatHome = new SimpleChatHome( - getStorageStrategy().read(), + getStorageStrategy().readChatRoomData(), clock, bufferSize); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 7de58620..d9e6c040 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -27,7 +27,7 @@ public abstract class AbstractStorageStrategyIT protected void stop() { - getStorageStrategy().write(chathome.getChatRooms()); + getStorageStrategy().writeChatRoomData(chathome.getChatRoomData()); } @Test @@ -35,30 +35,30 @@ public abstract class AbstractStorageStrategyIT { start(); - assertThat(chathome.getChatRooms().toStream()).hasSize(0); + assertThat(chathome.getChatRoomData().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block(); log.debug("Created chat-room {}", info); - ChatRoomData chatroom = chathome.getChatRoom(chatRoomId).block(); + ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); - assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom); + assertThat(chathome.getChatRoomData().toStream()).containsExactlyElementsOf(List.of(chatroom)); + assertThat(chathome.getChatRoomData(chatroom.getId())).emitsExactly(chatroom); assertThat(chathome - .getChatRoom(chatroom.getId()) + .getChatRoomData(chatroom.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); stop(); start(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); - assertThat(chathome.getChatRoom(chatroom.getId())).emitsExactly(chatroom); + assertThat(chathome.getChatRoomData().toStream()).containsExactlyElementsOf(List.of(chatroom)); + assertThat(chathome.getChatRoomData(chatroom.getId())).emitsExactly(chatroom); assertThat(chathome - .getChatRoom(chatroom.getId()) + .getChatRoomData(chatroom.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); } @@ -67,12 +67,12 @@ public abstract class AbstractStorageStrategyIT { start(); - assertThat(chathome.getChatRooms().toStream()).hasSize(0); + assertThat(chathome.getChatRoomData().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block(); log.debug("Created chat-room {}", infoA); - ChatRoomData chatroomA = chathome.getChatRoom(chatRoomAId).block(); + ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); @@ -81,33 +81,33 @@ public abstract class AbstractStorageStrategyIT UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block(); log.debug("Created chat-room {}", infoB); - ChatRoomData chatroomB = chathome.getChatRoom(chatRoomBId).block(); + ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); - assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA); + assertThat(chathome.getChatRoomData().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); + assertThat(chathome.getChatRoomData(chatroomA.getId())).emitsExactly(chatroomA); assertThat(chathome - .getChatRoom(chatroomA.getId()) + .getChatRoomData(chatroomA.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); - assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB); + assertThat(chathome.getChatRoomData(chatroomB.getId())).emitsExactly(chatroomB); assertThat(chathome - .getChatRoom(chatroomB.getId()) + .getChatRoomData(chatroomB.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); stop(); start(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); - assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA); + assertThat(chathome.getChatRoomData().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); + assertThat(chathome.getChatRoomData(chatroomA.getId())).emitsExactly(chatroomA); assertThat(chathome - .getChatRoom(chatroomA.getId()) + .getChatRoomData(chatroomA.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); - assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB); + assertThat(chathome.getChatRoomData(chatroomB.getId())).emitsExactly(chatroomB); assertThat(chathome - .getChatRoom(chatroomB.getId()) + .getChatRoomData(chatroomB.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java index e2ffd3a5..22c37d18 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java @@ -27,7 +27,7 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest .of(ownedShards()) .forEach(shard -> chatHomes[shard] = new SimpleChatHome( shard, - storageStrategy.read(), + storageStrategy.readChatRoomData(), clock, bufferSize())); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java index 190d0f24..1140f69b 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java @@ -22,7 +22,7 @@ public class SimpleChatHomeTest extends ChatHomeTest Clock clock) { return new SimpleChatHome( - storageStrategy.read(), + storageStrategy.readChatRoomData(), clock, bufferSize()); } -- 2.20.1