From ad1bacbf8d8853fb109802902fcefb709566cc85 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 3 Sep 2023 19:54:46 +0200 Subject: [PATCH] refactor: Splitted `ChatRoomInfo` and `ChatRoomData` - Aligned Code --- .../chat/backend/ChatBackendApplication.java | 5 +- .../backend/api/ChatBackendController.java | 60 ++++++------ .../kafka/chat/backend/domain/ChatHome.java | 6 +- .../chat/backend/domain/ChatRoomData.java | 6 +- .../backend/persistence/StorageStrategy.java | 25 ++++- .../InMemoryServicesConfiguration.java | 4 +- .../persistence/inmemory/ShardedChatHome.java | 36 +++++-- .../persistence/inmemory/SimpleChatHome.java | 69 +++++++++---- .../persistence/kafka/ChatRoomChannel.java | 96 ++++++++++++++----- .../persistence/kafka/KafkaChatHome.java | 33 +++++-- .../storage/files/ChatRoomServiceFactory.java | 11 --- .../files/FilesStorageConfiguration.java | 6 -- .../storage/files/FilesStorageStrategy.java | 61 ++++++------ .../storage/mongodb/ChatRoomTo.java | 20 ++-- .../storage/mongodb/MessageRepository.java | 11 +++ .../storage/mongodb/MessageTo.java | 34 ++++--- .../mongodb/MongoDbStorageConfiguration.java | 13 +-- .../mongodb/MongoDbStorageStrategy.java | 54 +++++++---- .../NoStorageStorageConfiguration.java | 18 +++- .../api/ChatBackendControllerTest.java | 42 ++++---- .../chat/backend/domain/ChatHomeTest.java | 8 +- .../domain/ChatHomeWithShardsTest.java | 4 +- .../chat/backend/domain/ChatRoomDataTest.java | 38 +++----- .../AbstractInMemoryStorageIT.java | 2 +- .../AbstractStorageStrategyIT.java | 45 +++++---- .../InMemoryWithFilesStorageIT.java | 4 - .../InMemoryWithMongoDbStorageIT.java | 16 ++-- .../inmemory/ShardedChatHomeTest.java | 5 +- .../inmemory/SimpleChatHomeTest.java | 5 +- .../storage/mongodb/MessageToTest.java | 37 ------- src/test/resources/data/mongodb/0001.sh | 1 + .../resources/data/mongodb/chatRoomTo.json | 27 +----- .../resources/data/mongodb/messageTo.json | 40 ++++++++ 33 files changed, 473 insertions(+), 369 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java delete mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java create mode 100644 src/test/resources/data/mongodb/messageTo.json diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 8e1ff9e5..15f8ae15 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @Autowired ChatBackendProperties properties; @Autowired - ChatHome[] chatHomes; + ChatHome chatHome; @Autowired StorageStrategy storageStrategy; @@ -32,8 +32,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - for (int shard = 0; shard < chatHomes.length; shard++) - storageStrategy.write(chatHomes[shard].getChatRooms()); + storageStrategy.write(chatHome); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index f41f45f6..e2fe714b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.RequiredArgsConstructor; import org.springframework.http.codec.ServerSentEvent; @@ -33,49 +33,49 @@ public class ChatBackendController public Flux list() { return chatHome - .getChatRooms() - .map(chatroom -> ChatRoomInfoTo.from(chatroom)); + .getChatRoomInfo() + .map(chatroomInfo -> ChatRoomInfoTo.from(chatroomInfo)); } - @GetMapping("{chatroomId}/list") - public Flux list(@PathVariable UUID chatroomId) + @GetMapping("{chatRoomId}/list") + public Flux list(@PathVariable UUID chatRoomId) { return chatHome - .getChatRoom(chatroomId) - .flatMapMany(chatroom -> chatroom + .getChatRoomData(chatRoomId) + .flatMapMany(chatRoomData -> chatRoomData .getMessages() .map(MessageTo::from)); } - @GetMapping("{chatroomId}") - public Mono get(@PathVariable UUID chatroomId) + @GetMapping("{chatRoomId}") + public Mono get(@PathVariable UUID chatRoomId) { return chatHome - .getChatRoom(chatroomId) - .map(chatroom -> ChatRoomInfoTo.from(chatroom)); + .getChatRoomInfo(chatRoomId) + .map(chatRoomInfo -> ChatRoomInfoTo.from(chatRoomInfo)); } - @PutMapping("{chatroomId}/{username}/{messageId}") + @PutMapping("{chatRoomId}/{username}/{messageId}") public Mono put( - @PathVariable UUID chatroomId, + @PathVariable UUID chatRoomId, @PathVariable String username, @PathVariable Long messageId, @RequestBody String text) { return chatHome - .getChatRoom(chatroomId) - .flatMap(chatroom -> put(chatroom, username, messageId, text)); + .getChatRoomData(chatRoomId) + .flatMap(chatRoomData -> put(chatRoomData, username, messageId, text)); } private Mono put( - ChatRoom chatroom, + ChatRoomData chatRoomData, String username, Long messageId, String text) { return - chatroom + chatRoomData .addMessage( messageId, username, @@ -83,40 +83,40 @@ public class ChatBackendController .map(message -> MessageTo.from(message)); } - @GetMapping("{chatroomId}/{username}/{messageId}") + @GetMapping("{chatRoomId}/{username}/{messageId}") public Mono get( - @PathVariable UUID chatroomId, + @PathVariable UUID chatRoomId, @PathVariable String username, @PathVariable Long messageId) { return chatHome - .getChatRoom(chatroomId) - .flatMap(chatroom -> get(chatroom, username, messageId)); + .getChatRoomData(chatRoomId) + .flatMap(chatRoomData -> get(chatRoomData, username, messageId)); } private Mono get( - ChatRoom chatroom, + ChatRoomData chatRoomData, String username, Long messageId) { return - chatroom + chatRoomData .getMessage(username, messageId) .map(message -> MessageTo.from(message)); } - @GetMapping(path = "{chatroomId}/listen") - public Flux> listen(@PathVariable UUID chatroomId) + @GetMapping(path = "{chatRoomId}/listen") + public Flux> listen(@PathVariable UUID chatRoomId) { return chatHome - .getChatRoom(chatroomId) - .flatMapMany(chatroom -> listen(chatroom)); + .getChatRoomData(chatRoomId) + .flatMapMany(chatRoomData -> listen(chatRoomData)); } - private Flux> listen(ChatRoom chatroom) + private Flux> listen(ChatRoomData chatRoomData) { - return chatroom + return chatRoomData .listen() .log() .map(message -> MessageTo.from(message)) @@ -131,6 +131,6 @@ public class ChatBackendController @PostMapping("/store") public void store() { - storageStrategy.write(chatHome.getChatRooms()); + storageStrategy.write(chatHome); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index e4d92dbb..2ff59cb3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -10,7 +10,9 @@ public interface ChatHome { Mono createChatRoom(UUID id, String name); - Mono getChatRoom(UUID id); + Mono getChatRoomInfo(UUID id); - Flux getChatRooms(); + Flux getChatRoomInfo(); + + Mono getChatRoomData(UUID id); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java index 873e58e7..511b9ade 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomData.java @@ -13,17 +13,17 @@ import java.util.regex.Pattern; @Slf4j -public class ChatRoom +public class ChatRoomData { public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); - private final Clock clock; private final ChatRoomService service; + private final Clock clock; private final int bufferSize; private Sinks.Many sink; - public ChatRoom( + public ChatRoomData( Clock clock, ChatRoomService service, int bufferSize) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java index bedd0aac..7d0f66d4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/StorageStrategy.java @@ -1,11 +1,30 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import reactor.core.publisher.Flux; +import java.util.UUID; + public interface StorageStrategy { - void write(Flux chatroomFlux); - Flux read(); + default void write(ChatHome chatHome) + { + writeChatRoomInfo( + chatHome + .getChatRoomInfo() + .doOnNext(chatRoomInfo -> + writeChatRoomData( + chatRoomInfo.getId(), + chatHome + .getChatRoomData(chatRoomInfo.getId()) + .flatMapMany(chatRoomData -> chatRoomData.getMessages())))); + } + + void writeChatRoomInfo(Flux chatRoomInfoFlux); + Flux readChatRoomInfo(); + void writeChatRoomData(UUID chatRoomId, Flux messageFlux); + Flux readChatRoomData(UUID chatRoomId); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 106c7369..4c90d6af 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -31,7 +31,7 @@ public class InMemoryServicesConfiguration Clock clock) { return new SimpleChatHome( - storageStrategy.read(), + storageStrategy, clock, properties.getChatroomBufferSize()); } @@ -52,7 +52,7 @@ public class InMemoryServicesConfiguration .of(properties.getInmemory().getOwnedShards()) .forEach(shard -> chatHomes[shard] = new SimpleChatHome( shard, - storageStrategy.read(), + storageStrategy, clock, properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java index c6aff1e3..eb3a6ce2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java @@ -14,13 +14,13 @@ import java.util.stream.Collectors; @Slf4j public class ShardedChatHome implements ChatHome { - private final ChatHome[] chatHomes; + private final SimpleChatHome[] chatHomes; private final Set ownedShards; private final ShardingStrategy shardingStrategy; public ShardedChatHome( - ChatHome[] chatHomes, + SimpleChatHome[] chatHomes, ShardingStrategy shardingStrategy) { this.chatHomes = chatHomes; @@ -48,13 +48,13 @@ public class ShardedChatHome implements ChatHome } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { int shard = selectShard(id); return chatHomes[shard] == null ? Mono.error(new ShardNotOwnedException(shard)) : chatHomes[shard] - .getChatRoom(id) + .getChatRoomInfo(id) .onErrorMap(throwable -> throwable instanceof UnknownChatroomException ? new UnknownChatroomException( id, @@ -64,13 +64,37 @@ public class ShardedChatHome implements ChatHome } @Override - public Flux getChatRooms() + public Flux getChatRoomInfo() { return Flux .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRooms()); + .flatMap(shard -> chatHomes[shard].getChatRoomInfo()); } + @Override + public Mono getChatRoomData(UUID id) + { + int shard = selectShard(id); + return chatHomes[shard] == null + ? Mono.error(new ShardNotOwnedException(shard)) + : chatHomes[shard] + .getChatRoomData(id) + .onErrorMap(throwable -> throwable instanceof UnknownChatroomException + ? new UnknownChatroomException( + id, + shard, + ownedShards.stream().mapToInt(i -> i.intValue()).toArray()) + : throwable); + } + + public Flux getChatRoomData() + { + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRoomData()); + } + + private int selectShard(UUID chatroomId) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java index c2d25b20..c8ddbf9d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,34 +14,41 @@ import java.util.*; public class SimpleChatHome implements ChatHome { private final Integer shard; - private final Map chatRooms; + private final Map chatRoomInfo; + private final Map chatRoomData; private final Clock clock; private final int bufferSize; public SimpleChatHome( - Flux chatroomFlux, + StorageStrategy storageStrategy, Clock clock, int bufferSize) { - this(null, chatroomFlux, clock, bufferSize); + this( + null, + storageStrategy, + clock, + bufferSize); } public SimpleChatHome( Integer shard, - Flux chatroomFlux, + StorageStrategy storageStrategy, Clock clock, int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); ; this.shard = shard; - this.chatRooms = new HashMap<>(); - chatroomFlux - .filter(chatRoom -> + this.chatRoomInfo = new HashMap<>(); + this.chatRoomData = new HashMap<>(); + storageStrategy + .readChatRoomInfo() + .filter(info -> { - if (shard == null || chatRoom.getShard() == shard) + if (shard == null || info.getShard() == shard) { return true; } @@ -49,12 +57,24 @@ public class SimpleChatHome implements ChatHome log.info( "SimpleChatHome for shard {} ignores not owned chat-room {}", shard, - chatRoom); + info); return false; } }) .toStream() - .forEach(chatroom -> chatRooms.put(chatroom.getId(), chatroom)); + .forEach(info -> + { + UUID chatRoomId = info.getId(); + chatRoomInfo.put(chatRoomId, info); + Flux messageFlux = + storageStrategy.readChatRoomData(chatRoomId); + chatRoomData.put( + info.getId(), + new ChatRoomData( + clock, + new InMemoryChatRoomService(messageFlux), + bufferSize)); + }); this.clock = clock; this.bufferSize = bufferSize; } @@ -65,22 +85,37 @@ public class SimpleChatHome implements ChatHome { log.info("Creating ChatRoom with buffer-size {}", bufferSize); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - chatRooms.put(id, chatRoom); - return Mono.just(chatRoom); + ChatRoomInfo chatRoomInfo = new ChatRoomInfo(id, name, shard); + this.chatRoomInfo.put(id, chatRoomInfo); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + this.chatRoomData.put(id, chatRoomData); + return Mono.just(chatRoomInfo); } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { return Mono - .justOrEmpty(chatRooms.get(id)) + .justOrEmpty(chatRoomInfo.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } @Override - public Flux getChatRooms() + public Flux getChatRoomInfo() + { + return Flux.fromIterable(chatRoomInfo.values()); + } + + @Override + public Mono getChatRoomData(UUID id) + { + return Mono + .justOrEmpty(chatRoomData.get(id)) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRoomData() { - return Flux.fromIterable(chatRooms.values()); + return Flux.fromIterable(chatRoomData.values()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java index 234554eb..45e93ccc 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomChannel.java @@ -35,7 +35,8 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; - private final Map[] chatrooms; + private final Map[] chatRoomInfo; + private final Map[] chatRoomData; private boolean running; @Getter @@ -65,10 +66,15 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; - this.chatrooms = new Map[numShards]; + this.chatRoomInfo = new Map[numShards]; + this.chatRoomData = new Map[numShards]; IntStream .range(0, numShards) - .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); + .forEach(shard -> + { + this.chatRoomInfo[shard] = new HashMap<>(); + this.chatRoomData[shard] = new HashMap<>(); + }); } @@ -277,30 +283,37 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener private void createChatRoom( UUID chatRoomId, CommandCreateChatRoomTo createChatRoomRequestTo, - int partition) + Integer partition) { - log.info("Loading ChatRoom {} with buffer-size {}", chatRoomId, bufferSize); - KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); - ChatRoom chatRoom = new ChatRoom( + log.info( + "Loading ChatRoom {} for shard {} with buffer-size {}", chatRoomId, - createChatRoomRequestTo.getName(), partition, + bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, chatRoomId); + ChatRoomData chatRoomData = new ChatRoomData( clock, service, bufferSize); - putChatRoom(chatRoom); + putChatRoom( + chatRoomId, + createChatRoomRequestTo.getName(), + partition, + chatRoomData); } private void createChatRoom(ChatRoomInfo chatRoomInfo) { UUID id = chatRoomInfo.getId(); - String name = chatRoomInfo.getName(); - int shard = chatRoomInfo.getShard(); log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); KafkaChatRoomService service = new KafkaChatRoomService(this, id); - ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); - putChatRoom(chatRoom); + ChatRoomData chatRoomData = new ChatRoomData(clock, service, bufferSize); + putChatRoom( + chatRoomInfo.getId(), + chatRoomInfo.getName(), + chatRoomInfo.getShard(), + chatRoomData); } private void loadChatMessage( @@ -313,9 +326,9 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + ChatRoomData chatRoomData = this.chatRoomData[partition].get(chatRoomId); KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); + (KafkaChatRoomService) chatRoomData.getChatRoomService(); kafkaChatRoomService.persistMessage(message); } @@ -338,22 +351,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener } - private void putChatRoom(ChatRoom chatRoom) + private void putChatRoom( + UUID chatRoomId, + String name, + Integer partition, + ChatRoomData chatRoomData) { - Integer partition = chatRoom.getShard(); - UUID chatRoomId = chatRoom.getId(); - if (chatrooms[partition].containsKey(chatRoomId)) + if (this.chatRoomInfo[partition].containsKey(chatRoomId)) { - log.warn("Ignoring existing chat-room: " + chatRoom); + log.warn( + "Ignoring existing chat-room for {}: {}", + partition, + chatRoomId); } else { log.info( "Adding new chat-room to partition {}: {}", partition, - chatRoom); + chatRoomData); - chatrooms[partition].put(chatRoomId, chatRoom); + this.chatRoomInfo[partition].put( + chatRoomId, + new ChatRoomInfo(chatRoomId, name, partition)); + this.chatRoomData[partition].put(chatRoomId, chatRoomData); } } @@ -365,7 +386,30 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener .toArray(); } - Mono getChatRoom(int shard, UUID id) + Mono getChatRoomData(int shard, UUID id) + { + if (loadInProgress) + { + return Mono.error(new LoadInProgressException()); + } + + if (!isShardOwned[shard]) + { + return Mono.error(new ShardNotOwnedException(shard)); + } + + return Mono.justOrEmpty(chatRoomData[shard].get(id)); + } + + Flux getChatRoomInfo() + { + return Flux + .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) + .filter(shard -> isShardOwned[shard]) + .flatMap(shard -> Flux.fromIterable(chatRoomInfo[shard].values())); + } + + Mono getChatRoomInfo(int shard, UUID id) { if (loadInProgress) { @@ -377,14 +421,14 @@ public class ChatRoomChannel implements Runnable, ConsumerRebalanceListener return Mono.error(new ShardNotOwnedException(shard)); } - return Mono.justOrEmpty(chatrooms[shard].get(id)); + return Mono.justOrEmpty(chatRoomInfo[shard].get(id)); } - Flux getChatRooms() + Flux getChatRoomData() { return Flux .fromStream(IntStream.range(0, numShards).mapToObj(i -> Integer.valueOf(i))) .filter(shard -> isShardOwned[shard]) - .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); + .flatMap(shard -> Flux.fromIterable(chatRoomData[shard].values())); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java index 06228396..8c749d65 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHome.java @@ -1,7 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomData; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; import lombok.RequiredArgsConstructor; @@ -30,26 +30,43 @@ public class KafkaChatHome implements ChatHome } @Override - public Mono getChatRoom(UUID id) + public Mono getChatRoomInfo(UUID id) { int shard = selectShard(id); return chatRoomChannel - .getChatRoom(shard, id) + .getChatRoomInfo(shard, id) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( id, shard, chatRoomChannel.getOwnedShards()))); } - int selectShard(UUID chatRoomId) + @Override + public Flux getChatRoomInfo() { - byte[] serializedKey = chatRoomId.toString().getBytes(); - return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + return chatRoomChannel.getChatRoomInfo(); } @Override - public Flux getChatRooms() + public Mono getChatRoomData(UUID id) { - return chatRoomChannel.getChatRooms(); + int shard = selectShard(id); + return chatRoomChannel + .getChatRoomData(shard, id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException( + id, + shard, + chatRoomChannel.getOwnedShards()))); + } + + public Flux getChatRoomData() + { + return chatRoomChannel.getChatRoomData(); + } + + int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java deleted file mode 100644 index d06c8f9a..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/ChatRoomServiceFactory.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.files; - -import de.juplo.kafka.chat.backend.domain.ChatRoomService; -import de.juplo.kafka.chat.backend.domain.Message; -import reactor.core.publisher.Flux; - - -public interface ChatRoomServiceFactory -{ - ChatRoomService create(Flux messageFlux); -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index 11b64402..5eb3bbd0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; @@ -13,7 +12,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.nio.file.Paths; -import java.time.Clock; @ConditionalOnProperty( @@ -30,16 +28,12 @@ public class FilesStorageConfiguration @Bean public StorageStrategy storageStrategy( ChatBackendProperties properties, - Clock clock, ShardingStrategy shardingStrategy, ObjectMapper mapper) { return new FilesStorageStrategy( Paths.get(properties.getInmemory().getStorageDirectory()), - clock, - properties.getChatroomBufferSize(), shardingStrategy, - messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 025e3aef..1f0ebad3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -5,10 +5,10 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -16,7 +16,6 @@ import reactor.core.publisher.Flux; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.Clock; import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; @@ -31,15 +30,12 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; - private final Clock clock; - private final int bufferSize; private final ShardingStrategy shardingStrategy; - private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @Override - public void write(Flux chatroomFlux) + public void writeChatRoomInfo(Flux chatRoomInfoFlux) { Path path = chatroomsPath(); log.info("Writing chatrooms to {}", path); @@ -52,7 +48,7 @@ public class FilesStorageStrategy implements StorageStrategy .getFactory() .createGenerator(Files.newBufferedWriter(path, CREATE, TRUNCATE_EXISTING)); - chatroomFlux + chatRoomInfoFlux .log() .doFirst(() -> { @@ -78,13 +74,12 @@ public class FilesStorageStrategy implements StorageStrategy throw new RuntimeException(e); } }) - .subscribe(chatroom -> + .subscribe(chatRoomInfo -> { try { - ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom); - generator.writeObject(infoTo); - writeMessages(infoTo, chatroom.getMessages()); + ChatRoomInfoTo chatRoomInfoTo = ChatRoomInfoTo.from(chatRoomInfo); + generator.writeObject(chatRoomInfoTo); } catch (IOException e) { @@ -99,30 +94,37 @@ public class FilesStorageStrategy implements StorageStrategy } @Override - public Flux read() + public Flux readChatRoomInfo() { JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(infoTo -> + .map(chatRoomInfoTo -> { - UUID chatRoomId = infoTo.getId(); + UUID chatRoomId = chatRoomInfoTo.getId(); int shard = shardingStrategy.selectShard(chatRoomId); - return new ChatRoom( - infoTo.getId(), - infoTo.getName(), - shard, - clock, - factory.create(readMessages(infoTo)), - bufferSize); + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomInfoTo.getShard(), + shard); + + return new ChatRoomInfo( + chatRoomId, + chatRoomInfoTo.getName(), + shard); }); } - public void writeMessages(ChatRoomInfoTo infoTo, Flux messageFlux) + @Override + public void writeChatRoomData( + UUID chatRoomId, + Flux messageFlux) { - Path path = chatroomPath(infoTo); - log.info("Writing messages for {} to {}", infoTo, path); + Path path = chatroomPath(chatRoomId); + log.info("Writing messages for {} to {}", chatRoomId, path); try { Files.createDirectories(storagePath); @@ -177,11 +179,12 @@ public class FilesStorageStrategy implements StorageStrategy } } - public Flux readMessages(ChatRoomInfoTo infoTo) + @Override + public Flux readChatRoomData(UUID chatRoomId) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(infoTo), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(chatRoomId), mapper, type)) .log() .map(MessageTo::toMessage); } @@ -191,8 +194,8 @@ public class FilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatRoomInfoTo infoTo) + Path chatroomPath(UUID id) { - return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json")); + return storagePath.resolve(Path.of(id.toString() + ".json")); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java index 1ad8d178..0086053e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -1,36 +1,30 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import lombok.*; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; -import java.util.List; - @AllArgsConstructor @NoArgsConstructor @Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE) @EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "name" }) +@ToString(of = { "id", "shard", "name" }) @Document public class ChatRoomTo { @Id private String id; + private Integer shard; private String name; - private List messages; - public static ChatRoomTo from(ChatRoom chatroom) + public static ChatRoomTo from(ChatRoomInfo chatRoomInfo) { return new ChatRoomTo( - chatroom.getId().toString(), - chatroom.getName(), - chatroom - .getMessages() - .map(MessageTo::from) - .collectList() - .block()); + chatRoomInfo.getId().toString(), + chatRoomInfo.getShard(), + chatRoomInfo.getName()); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java new file mode 100644 index 00000000..a429f96e --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageRepository.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.chat.backend.persistence.storage.mongodb; + +import org.springframework.data.mongodb.repository.MongoRepository; + +import java.util.List; + + +public interface MessageRepository extends MongoRepository +{ + List findByChatRoomIdOrderBySerialAsc(String chatRoomId); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java index 4f93695c..f6c6b85d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageTo.java @@ -2,45 +2,51 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.domain.Message; import lombok.*; +import org.springframework.data.mongodb.core.index.Indexed; +import org.springframework.data.mongodb.core.mapping.Document; +import org.springframework.data.mongodb.core.mapping.Field; import java.time.LocalDateTime; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import java.util.UUID; @AllArgsConstructor @NoArgsConstructor @Getter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE) -@EqualsAndHashCode(of = { "user", "id" }) -@ToString(of = { "user", "id" }) +@EqualsAndHashCode(of = { "chatRoomId", "user", "id" }) +@ToString(of = { "chatRoomId", "user", "id" }) +@Document class MessageTo { - final static Pattern SPLIT_ID = Pattern.compile("^([a-z-0-9]+)--([0-9]+)$"); - private String id; + @Indexed + private String chatRoomId; + @Indexed + private String user; + @Field("id") + @Indexed + private Long id; + @Indexed private Long serial; private String time; private String text; Message toMessage() { - Matcher matcher = SPLIT_ID.matcher(id); - if (!matcher.matches()) - throw new RuntimeException("MessageTo with invalid ID: " + id); - Long messageId = Long.parseLong(matcher.group(2)); - String user = matcher.group(1); return new Message( - Message.MessageKey.of(user, messageId), + Message.MessageKey.of(user, id), serial, LocalDateTime.parse(time), text); } - static MessageTo from(Message message) + static MessageTo from(UUID chatRoomId, Message message) { return new MessageTo( - message.getUsername() + "--" + message.getId(), + chatRoomId.toString(), + message.getUsername(), + message.getId(), message.getSerialNumber(), message.getTimestamp().toString(), message.getMessageText()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 9ef38a71..20e6ed43 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,15 +1,11 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; -import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.time.Clock; - @ConditionalOnProperty( prefix = "chat.backend.inmemory", @@ -21,15 +17,12 @@ public class MongoDbStorageConfiguration @Bean public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, - ChatBackendProperties properties, - Clock clock, + MessageRepository messageRepository, ShardingStrategy shardingStrategy) { return new MongoDbStorageStrategy( chatRoomRepository, - clock, - properties.getChatroomBufferSize(), - shardingStrategy, - messageFlux -> new InMemoryChatRoomService(messageFlux)); + messageRepository, + shardingStrategy); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index d3a18bc2..bc821e6d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,14 +1,13 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.inmemory.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; -import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; -import java.time.Clock; import java.util.UUID; @@ -16,40 +15,55 @@ import java.util.UUID; @Slf4j public class MongoDbStorageStrategy implements StorageStrategy { - private final ChatRoomRepository repository; - private final Clock clock; - private final int bufferSize; + private final ChatRoomRepository chatRoomRepository; + private final MessageRepository messageRepository; private final ShardingStrategy shardingStrategy; - private final ChatRoomServiceFactory factory; @Override - public void write(Flux chatroomFlux) + public void writeChatRoomInfo(Flux chatRoomInfoFlux) { - chatroomFlux + chatRoomInfoFlux .map(ChatRoomTo::from) - .subscribe(chatroomTo -> repository.save(chatroomTo)); + .subscribe(chatroomTo -> chatRoomRepository.save(chatroomTo)); } @Override - public Flux read() + public Flux readChatRoomInfo() { return Flux - .fromIterable(repository.findAll()) + .fromIterable(chatRoomRepository.findAll()) .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); int shard = shardingStrategy.selectShard(chatRoomId); - return new ChatRoom( + + log.info( + "{} - old shard: {}, new shard: {}", + chatRoomId, + chatRoomTo.getShard(), + shard); + + return new ChatRoomInfo( chatRoomId, chatRoomTo.getName(), - shard, - clock, - factory.create( - Flux - .fromIterable(chatRoomTo.getMessages()) - .map(messageTo -> messageTo.toMessage())), - bufferSize); + shard); }); } + + @Override + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) + { + messageFlux + .map(message -> MessageTo.from(chatRoomId, message)) + .subscribe(messageTo -> messageRepository.save(messageTo)); + } + + @Override + public Flux readChatRoomData(UUID chatRoomId) + { + return Flux + .fromIterable(messageRepository.findByChatRoomIdOrderBySerialAsc(chatRoomId.toString())) + .map(messageTo -> messageTo.toMessage()); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java index ffed2992..ab24bb8a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.nostorage; -import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -10,6 +11,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import reactor.core.publisher.Flux; +import java.util.UUID; + @ConditionalOnProperty( prefix = "chat.backend.inmemory", @@ -29,10 +32,19 @@ public class NoStorageStorageConfiguration return new StorageStrategy() { @Override - public void write(Flux chatroomFlux) {} + public void writeChatRoomInfo(Flux chatRoomInfoFlux) {} + + @Override + public Flux readChatRoomInfo() + { + return Flux.empty(); + } + + @Override + public void writeChatRoomData(UUID chatRoomId, Flux messageFlux) {} @Override - public Flux read() + public Flux readChatRoomData(UUID chatRoomId) { return Flux.empty(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 9b8b09f8..a7e40781 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -42,7 +42,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -62,7 +62,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -83,7 +83,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -109,7 +109,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -132,7 +132,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new UnknownChatroomException(chatroomId)); // When WebTestClient.ResponseSpec responseSpec = client @@ -170,13 +170,11 @@ public class ChatBackendControllerTest LocalDateTime timeExistingMessage = LocalDateTime.parse(timeExistingMessageAsString); String textExistingMessage = "Existing"; String textMutatedMessage = "Mutated!"; - ChatRoom chatRoom = new ChatRoom( - chatroomId, - "Test-ChatRoom", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), - chatRoomService, 8); - when(chatHome.getChatRoom(eq(chatroomId))).thenReturn(Mono.just(chatRoom)); + chatRoomService, + 8); + when(chatHome.getChatRoomData(eq(chatroomId))).thenReturn(Mono.just(chatRoomData)); Message existingMessage = new Message( key, serialNumberExistingMessage, @@ -222,14 +220,12 @@ public class ChatBackendControllerTest Long messageId = 66l; Message.MessageKey key = Message.MessageKey.of(user, messageId); String textMessage = "Hallo Welt"; - ChatRoom chatRoom = new ChatRoom( - chatroomId, - "Test-ChatRoom", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), - chatRoomService, 8); - when(chatHome.getChatRoom(any(UUID.class))) - .thenReturn(Mono.just(chatRoom)); + chatRoomService, + 8); + when(chatHome.getChatRoomData(any(UUID.class))) + .thenReturn(Mono.just(chatRoomData)); when(chatRoomService.getMessage(any(Message.MessageKey.class))) .thenReturn(Mono.empty()); // Needed for readable error-reports, in case of a bug that leads to according unwanted call @@ -262,7 +258,7 @@ public class ChatBackendControllerTest // Given UUID chatroomId = UUID.randomUUID(); int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomInfo(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -282,7 +278,7 @@ public class ChatBackendControllerTest // Given UUID chatroomId = UUID.randomUUID(); int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -304,7 +300,7 @@ public class ChatBackendControllerTest String username = "foo"; Long messageId = 66l; int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -331,7 +327,7 @@ public class ChatBackendControllerTest String username = "foo"; Long messageId = 66l; int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client @@ -355,7 +351,7 @@ public class ChatBackendControllerTest // Given UUID chatroomId = UUID.randomUUID(); int shard = 666; - when(chatHome.getChatRoom(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); + when(chatHome.getChatRoomData(eq(chatroomId))).thenThrow(new ShardNotOwnedException(shard)); // When WebTestClient.ResponseSpec responseSpec = client diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java index 2a995ef4..5b50314f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -29,8 +29,8 @@ public abstract class ChatHomeTest UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); // When - Mono mono = Mono - .defer(() -> chatHome.getChatRoom(chatRoomId)) + Mono mono = Mono + .defer(() -> chatHome.getChatRoomData(chatRoomId)) .log("testGetExistingChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) @@ -48,8 +48,8 @@ public abstract class ChatHomeTest UUID chatRoomId = UUID.fromString("7f59ec77-832e-4a17-8d22-55ef46242c17"); // When - Mono mono = Mono - .defer(() -> chatHome.getChatRoom(chatRoomId)) + Mono mono = Mono + .defer(() -> chatHome.getChatRoomData(chatRoomId)) .log("testGetNonExistentChatroom") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java index eebeea1e..c65908c4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeWithShardsTest.java @@ -26,8 +26,8 @@ public abstract class ChatHomeWithShardsTest extends ChatHomeTest UUID chatRoomId = UUID.fromString("4e7246a6-29ae-43ea-b56f-669c3481ac19"); // When - Mono mono = Mono - .defer(() -> chatHome.getChatRoom(chatRoomId)) + Mono mono = Mono + .defer(() -> chatHome.getChatRoomData(chatRoomId)) .log("testGetChatroomForNotOwnedShard") .retryWhen(Retry .backoff(5, Duration.ofSeconds(1)) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java index 822ffe77..790482fe 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomDataTest.java @@ -8,13 +8,12 @@ import java.time.Clock; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; -import java.util.UUID; import static org.mockito.Mockito.*; import static pl.rzrz.assertj.reactor.Assertions.assertThat; -public class ChatRoomTest +public class ChatRoomDataTest { @Test @DisplayName("Assert, that Mono emits expected message, if it exists") @@ -24,10 +23,7 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), chatRoomService, 8); @@ -37,7 +33,7 @@ public class ChatRoomTest when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); // When - Mono mono = chatRoom.getMessage(user, messageId); + Mono mono = chatRoomData.getMessage(user, messageId); // Then assertThat(mono).emitsExactly(message); @@ -51,17 +47,14 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), chatRoomService, 8); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); // When - Mono mono = chatRoom.getMessage(user, messageId); + Mono mono = chatRoomData.getMessage(user, messageId); // Then assertThat(mono).emitsCount(0); @@ -75,10 +68,7 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), chatRoomService, 8); @@ -91,7 +81,7 @@ public class ChatRoomTest when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When - Mono mono = chatRoom.addMessage(messageId, user, messageText); + Mono mono = chatRoomData.addMessage(messageId, user, messageText); // Then assertThat(mono).emitsExactly(message); @@ -105,10 +95,7 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), chatRoomService, 8); @@ -121,7 +108,7 @@ public class ChatRoomTest when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When - Mono mono = chatRoom.addMessage(messageId, user, messageText); + Mono mono = chatRoomData.addMessage(messageId, user, messageText); // Then assertThat(mono).emitsExactly(message); @@ -135,10 +122,7 @@ public class ChatRoomTest String user = "foo"; Long messageId = 1l; ChatRoomService chatRoomService = mock(ChatRoomService.class); - ChatRoom chatRoom = new ChatRoom( - UUID.randomUUID(), - "Foo", - 0, + ChatRoomData chatRoomData = new ChatRoomData( Clock.systemDefaultZone(), chatRoomService, 8); @@ -152,7 +136,7 @@ public class ChatRoomTest when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When - Mono mono = chatRoom.addMessage(messageId, user, mutatedText); + Mono mono = chatRoomData.addMessage(messageId, user, mutatedText); // Then assertThat(mono).sendsError(); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index 62dc08ad..142f7099 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -22,7 +22,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI int bufferSize = 8; SimpleChatHome simpleChatHome = new SimpleChatHome( - getStorageStrategy().read(), + getStorageStrategy(), clock, bufferSize); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 69365325..e619649a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -27,7 +27,7 @@ public abstract class AbstractStorageStrategyIT protected void stop() { - getStorageStrategy().write(chathome.getChatRooms()); + getStorageStrategy().write(chathome); } @Test @@ -35,31 +35,30 @@ public abstract class AbstractStorageStrategyIT { start(); - assertThat(chathome.getChatRooms().toStream()).hasSize(0); + assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); ChatRoomInfo info = chathome.createChatRoom(chatRoomId, "FOO").block(); log.debug("Created chat-room {}", info); - ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block(); + ChatRoomData chatroom = chathome.getChatRoomData(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); Message m4 = chatroom.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); - - assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); - assertThat(chathome.getChatRoom(chatRoomId)).emitsExactly(chatroom); + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); + assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); assertThat(chathome - .getChatRoom(chatRoomId) + .getChatRoomData(chatRoomId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); stop(); start(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyElementsOf(List.of(chatroom)); - assertThat(chathome.getChatRoom(chatRoomId)).emitsExactly(chatroom); + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyElementsOf(List.of(info)); + assertThat(chathome.getChatRoomInfo(chatRoomId)).emitsExactly(info); assertThat(chathome - .getChatRoom(chatRoomId) + .getChatRoomData(chatRoomId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); } @@ -68,12 +67,12 @@ public abstract class AbstractStorageStrategyIT { start(); - assertThat(chathome.getChatRooms().toStream()).hasSize(0); + assertThat(chathome.getChatRoomInfo().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); ChatRoomInfo infoA = chathome.createChatRoom(chatRoomAId, "FOO").block(); log.debug("Created chat-room {}", infoA); - ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block(); + ChatRoomData chatroomA = chathome.getChatRoomData(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); @@ -82,33 +81,33 @@ public abstract class AbstractStorageStrategyIT UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); ChatRoomInfo infoB = chathome.createChatRoom(chatRoomBId, "BAR").block(); log.debug("Created chat-room {}", infoB); - ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block(); + ChatRoomData chatroomB = chathome.getChatRoomData(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); - assertThat(chathome.getChatRoom(chatRoomAId)).emitsExactly(chatroomA); + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); + assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); assertThat(chathome - .getChatRoom(chatRoomAId) + .getChatRoomData(chatRoomAId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); - assertThat(chathome.getChatRoom(chatRoomBId)).emitsExactly(chatroomB); + assertThat(chathome.getChatRoomData(chatRoomBId)).emitsExactly(chatroomB); assertThat(chathome - .getChatRoom(chatRoomBId) + .getChatRoomData(chatRoomBId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); stop(); start(); - assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); - assertThat(chathome.getChatRoom(chatRoomAId)).emitsExactly(chatroomA); + assertThat(chathome.getChatRoomInfo().toStream()).containsExactlyInAnyOrderElementsOf(List.of(infoA, infoB)); + assertThat(chathome.getChatRoomInfo(chatRoomAId)).emitsExactly(infoA); assertThat(chathome - .getChatRoom(chatRoomAId) + .getChatRoomData(chatRoomAId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); - assertThat(chathome.getChatRoom(chatRoomBId)).emitsExactly(chatroomB); + assertThat(chathome.getChatRoomInfo(chatRoomBId)).emitsExactly(infoB); assertThat(chathome - .getChatRoom(chatRoomBId) + .getChatRoomData(chatRoomBId) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index 1bb08708..be40eed2 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -4,7 +4,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -32,10 +31,7 @@ public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); storageStrategy = new FilesStorageStrategy( path, - clock, - 8, chatRoomId -> 0, - messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index 7ca9cb2f..a0dab37c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -1,8 +1,8 @@ package de.juplo.kafka.chat.backend.persistence; import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; +import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageRepository; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -36,9 +36,9 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT @Autowired MongoDbStorageStrategy storageStrategy; @Autowired - ChatRoomRepository repository; + ChatRoomRepository chatRoomRepository; @Autowired - Clock clock; + MessageRepository messageRepository; public InMemoryWithMongoDbStorageIT() @@ -59,14 +59,13 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT @Bean MongoDbStorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, + MessageRepository messageRepository, Clock clock) { return new MongoDbStorageStrategy( chatRoomRepository, - clock, - 8, - chatRoomId -> 0, - messageFlux -> new InMemoryChatRoomService(messageFlux)); + messageRepository, + chatRoomId -> 0); } @Bean @@ -102,6 +101,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT { Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); CONTAINER.followOutput(logConsumer); - repository.deleteAll(); + chatRoomRepository.deleteAll(); + messageRepository.deleteAll(); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java index e2ffd3a5..7da3b15c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHomeTest.java @@ -27,7 +27,7 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest .of(ownedShards()) .forEach(shard -> chatHomes[shard] = new SimpleChatHome( shard, - storageStrategy.read(), + storageStrategy, clock, bufferSize())); @@ -41,10 +41,7 @@ public class ShardedChatHomeTest extends ChatHomeWithShardsTest { return new FilesStorageStrategy( Paths.get("target", "test-classes", "data", "files"), - clock, - bufferSize(), new KafkaLikeShardingStrategy(NUM_SHARDS), - messageFlux -> new InMemoryChatRoomService(messageFlux), new ObjectMapper()); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java index 190d0f24..8be31731 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java @@ -22,7 +22,7 @@ public class SimpleChatHomeTest extends ChatHomeTest Clock clock) { return new SimpleChatHome( - storageStrategy.read(), + storageStrategy, clock, bufferSize()); } @@ -32,10 +32,7 @@ public class SimpleChatHomeTest extends ChatHomeTest { return new FilesStorageStrategy( Paths.get("target", "test-classes", "data", "files"), - clock, - bufferSize(), chatRoomId -> 0, - messageFlux -> new InMemoryChatRoomService(messageFlux), new ObjectMapper()); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java deleted file mode 100644 index 33a8a503..00000000 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MessageToTest.java +++ /dev/null @@ -1,37 +0,0 @@ -package de.juplo.kafka.chat.backend.persistence.storage.mongodb; - -import de.juplo.kafka.chat.backend.domain.Message; -import org.junit.jupiter.api.Test; - -import java.time.LocalDateTime; - -import static org.assertj.core.api.Assertions.assertThat; - - -public class MessageToTest -{ - @Test - void testFrom() - { - Message message = new Message( - Message.MessageKey.of("ute", 1l), - 6l, - LocalDateTime.now(), - "foo"); - MessageTo messageTo = MessageTo.from(message); - assertThat(messageTo.getId()).isEqualTo("ute--1"); - } - - @Test - void testToMessage() - { - MessageTo messageTo = new MessageTo( - "ute--1", - 6l, - LocalDateTime.now().toString(), - "foo"); - Message message = messageTo.toMessage(); - assertThat(message.getId()).isEqualTo(1l); - assertThat(message.getUsername()).isEqualTo("ute"); - } -} diff --git a/src/test/resources/data/mongodb/0001.sh b/src/test/resources/data/mongodb/0001.sh index 014a8be7..ec4ebbf7 100644 --- a/src/test/resources/data/mongodb/0001.sh +++ b/src/test/resources/data/mongodb/0001.sh @@ -1,2 +1,3 @@ #!/bin/bash mongoimport --collection=chatRoomTo --db=test /docker-entrypoint-initdb.d/chatRoomTo.json +mongoimport --collection=messageTo --db=test /docker-entrypoint-initdb.d/messageTo.json diff --git a/src/test/resources/data/mongodb/chatRoomTo.json b/src/test/resources/data/mongodb/chatRoomTo.json index ae150343..db26b899 100644 --- a/src/test/resources/data/mongodb/chatRoomTo.json +++ b/src/test/resources/data/mongodb/chatRoomTo.json @@ -1,31 +1,6 @@ { "_id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "shard": 0, "name": "FOO", - "messages": [ - { - "_id": "peter--1", - "serial": 0, - "time": "2023-01-13T20:43:16.803382151", - "text": "Hallo, ich heiße Peter!" - }, - { - "_id": "ute--1", - "serial": 1, - "time": "2023-01-13T20:43:16.804049969", - "text": "Ich bin Ute..." - }, - { - "_id": "peter--2", - "serial": 2, - "time": "2023-01-13T20:43:16.804092782", - "text": "Willst du mit mir gehen?" - }, - { - "_id": "klaus--1", - "serial": 3, - "time": "2023-01-13T20:43:16.804122604", - "text": "Ja? Nein? Vielleicht??" - } - ], "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomTo" } diff --git a/src/test/resources/data/mongodb/messageTo.json b/src/test/resources/data/mongodb/messageTo.json new file mode 100644 index 00000000..02fed6ff --- /dev/null +++ b/src/test/resources/data/mongodb/messageTo.json @@ -0,0 +1,40 @@ +{ + "_id": "64f7952cecf06d750cad4b9c", + "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "user": "peter", + "id": 1, + "serial": 0, + "time": "2023-01-13T20:43:16.803382151", + "text": "Hallo, ich heiße Peter!", + "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo" +} +{ + "_id": "64f7952cecf06d750cad4b9d", + "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "user": "ute", + "id": 1, + "serial": 1, + "time": "2023-01-13T20:43:16.804049969", + "text": "Ich bin Ute...", + "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo" +} +{ + "_id": "64f7952cecf06d750cad4b9e", + "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "user": "peter", + "id": 2, + "serial": 2, + "time": "2023-01-13T20:43:16.804092782", + "text": "Willst du mit mir gehen?", + "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo" +} +{ + "_id": "64f7953ddf7a9063e0b1f7dc", + "chatRoomId": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", + "user": "klaus", + "id": 1, + "serial": 3, + "time": "2023-01-13T20:43:16.804122604", + "text": "Ja? Nein? Vielleicht??", + "_class": "de.juplo.kafka.chat.backend.persistence.storage.mongodb.MessageTo" +} -- 2.20.1