From 5e027064643ea9060c3a2cc8f4272360fa02abd3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 17:40:21 +0100 Subject: [PATCH 01/16] chore: Switched the default-configuration to sharding `none` --- .../de/juplo/kafka/chat/backend/ChatBackendProperties.java | 6 +++--- .../InMemoryWithFilesAndShardingConfigurationIT.java | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 5cd4535c..177d4f51 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -22,9 +22,9 @@ public class ChatBackendProperties @Setter public static class InMemoryServicesProperties { - private ShardingStrategyType shardingStrategy = ShardingStrategyType.kafkalike; - private int numShards = 10; - private int[] ownedShards = { 2 }; + private ShardingStrategyType shardingStrategy = ShardingStrategyType.none; + private int numShards = 1; + private int[] ownedShards = new int[] { 0 }; private StorageStrategyType storageStrategy = StorageStrategyType.files; private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java index 5138deed..d792d258 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java @@ -7,7 +7,9 @@ import org.springframework.boot.test.context.SpringBootTest; webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "chat.backend.inmemory.storage-directory=target/test-classes/data/files", - "chat.backend.inmemory.sharding-strategy=kafkalike" }) + "chat.backend.inmemory.sharding-strategy=kafkalike", + "chat.backend.inmemory.num-shards=10", + "chat.backend.inmemory.owned-shards=2" }) class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationIT { } -- 2.20.1 From 6eb03cbe4609eea0b7061b574a7e273ecd0c9813 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 18:11:56 +0100 Subject: [PATCH 02/16] fix: Removed unnecessary generic in `ChatHomeService` --- .../de/juplo/kafka/chat/backend/domain/ChatHomeService.java | 2 +- .../backend/persistence/inmemory/InMemoryChatHomeService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index d2bc508c..7f13283a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -6,7 +6,7 @@ import reactor.core.publisher.Mono; import java.util.UUID; -public interface ChatHomeService +public interface ChatHomeService { Mono putChatRoom(ChatRoom chatRoom); Mono getChatRoom(int shard, UUID id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 0beb7f36..95a30dbd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -11,7 +11,7 @@ import java.util.*; @Slf4j -public class InMemoryChatHomeService implements ChatHomeService +public class InMemoryChatHomeService implements ChatHomeService { private final ShardingStrategy shardingStrategy; private final Map[] chatrooms; -- 2.20.1 From daca33d027e4c0d036fc2aa7c3d9b2120f3ad98a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 18:09:46 +0100 Subject: [PATCH 03/16] fix: Fixed a NPE in `ShardedChatHome.getChatRooms()` - When collecting the `ChatRoom`s for all shards, unused shards with a `null` value were not skipped. - Also added log-messages of level `INFO` for the creation of `ChatRoom`, `SimpleChatHome` and `ShardedChatHome`. --- .../kafka/chat/backend/domain/ChatRoom.java | 1 + .../chat/backend/domain/ShardedChatHome.java | 35 +++++++++++++++---- .../chat/backend/domain/SimpleChatHome.java | 9 +++-- .../chat/backend/AbstractConfigurationIT.java | 9 +++++ 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 1c21fb97..cc5c5a07 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -38,6 +38,7 @@ public class ChatRoom ChatRoomService service, int bufferSize) { + log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); this.id = id; this.name = name; this.clock = clock; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index ffa7860a..3023f782 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -1,17 +1,40 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; -@RequiredArgsConstructor +@Slf4j public class ShardedChatHome implements ChatHome { private final ChatHome[] chatHomes; - private final ShardingStrategy selectionStrategy; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; + + + public ShardedChatHome( + ChatHome[] chatHomes, + ShardingStrategy shardingStrategy) + { + this.chatHomes = chatHomes; + this.shardingStrategy = shardingStrategy; + this.ownedShards = new HashSet<>(); + for (int shard = 0; shard < chatHomes.length; shard++) + if(chatHomes[shard] != null) + this.ownedShards.add(shard); + log.info( + "Created ShardedChatHome for shards: {}", + ownedShards + .stream() + .map(String::valueOf) + .collect(Collectors.joining(", "))); + } @Override @@ -30,13 +53,13 @@ public class ShardedChatHome implements ChatHome public Flux getChatRooms() { return Flux - .fromArray(chatHomes) - .flatMap(chatHome -> chatHome.getChatRooms()); + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRooms()); } private int selectShard(UUID chatroomId) { - return selectionStrategy.selectShard(chatroomId); + return shardingStrategy.selectShard(chatroomId); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java index b15eab3b..46802c69 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -9,13 +8,19 @@ import java.util.*; @Slf4j -@RequiredArgsConstructor public class SimpleChatHome implements ChatHome { private final ChatHomeService service; private final int shard; + public SimpleChatHome(ChatHomeService service, int shard) + { + log.info("Created SimpleChatHome for shard {}", shard); + this.service = service; + this.shard = shard; + } + public SimpleChatHome(ChatHomeService service) { this(service, 0); diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java index 8c309b84..c424b294 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java @@ -33,6 +33,15 @@ public abstract class AbstractConfigurationIT .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.status").isEqualTo("UP"); + webTestClient + .get() + .uri("http://localhost:{port}/list", port) + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath("$.length()").isEqualTo(1) + .jsonPath("$[0].name").isEqualTo("FOO"); webTestClient .get() .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7", port) -- 2.20.1 From 2095f4c6a102a52f2a15360d1b6355e4990f8f43 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 19:53:16 +0100 Subject: [PATCH 04/16] feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint - Technically, the attribute is not needed. - But, to implement the gua/sha-pattern, it is needed as routing-hing. - Hence, it is reintroduced here and also added to the `ChatRoomTo` to be send to the client. --- .../kafka/chat/backend/api/ChatRoomTo.java | 2 ++ .../kafka/chat/backend/domain/ChatRoom.java | 4 ++++ .../inmemory/InMemoryChatHomeService.java | 21 +++---------------- .../inmemory/InMemoryChatRoomFactory.java | 3 ++- .../InMemoryServicesConfiguration.java | 8 +++---- .../files/FilesStorageConfiguration.java | 4 +++- .../storage/files/FilesStorageStrategy.java | 13 ++++++++++-- .../mongodb/MongoDbStorageConfiguration.java | 5 ++++- .../mongodb/MongoDbStorageStrategy.java | 4 ++++ .../api/ChatBackendControllerTest.java | 2 ++ .../chat/backend/domain/ChatRoomTest.java | 5 +++++ .../backend/domain/SimpleChatHomeTest.java | 1 + .../InMemoryWithFilesStorageIT.java | 2 +- .../InMemoryWithMongoDbStorageIT.java | 2 +- 14 files changed, 46 insertions(+), 30 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java index 3cc59210..e997e4bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java @@ -10,6 +10,7 @@ public class ChatRoomTo { private UUID id; private String name; + private int shard; public static ChatRoomTo from(ChatRoom chatroom) @@ -17,6 +18,7 @@ public class ChatRoomTo ChatRoomTo to = new ChatRoomTo(); to.id = chatroom.getId(); to.name = chatroom.getName(); + to.shard = chatroom.getShard(); return to; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cc5c5a07..4f855b8c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -25,6 +25,8 @@ public class ChatRoom private final UUID id; @Getter private final String name; + @Getter + private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -34,6 +36,7 @@ public class ChatRoom public ChatRoom( UUID id, String name, + int shard, Clock clock, ChatRoomService service, int bufferSize) @@ -41,6 +44,7 @@ public class ChatRoom log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); this.id = id; this.name = name; + this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 95a30dbd..fd54d34c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,18 +12,15 @@ import java.util.*; @Slf4j public class InMemoryChatHomeService implements ChatHomeService { - private final ShardingStrategy shardingStrategy; private final Map[] chatrooms; public InMemoryChatHomeService( - ShardingStrategy shardingStrategy, int numShards, int[] ownedShards, Flux chatroomFlux) { log.debug("Creating InMemoryChatHomeService"); - this.shardingStrategy = shardingStrategy; this.chatrooms = new Map[numShards]; Set owned = Arrays .stream(ownedShards) @@ -41,8 +37,7 @@ public class InMemoryChatHomeService implements ChatHomeService chatroomFlux .filter(chatRoom -> { - int shard = shardingStrategy.selectShard(chatRoom.getId()); - if (owned.contains(shard)) + if (owned.contains(chatRoom.getShard())) { return true; } @@ -53,16 +48,13 @@ public class InMemoryChatHomeService implements ChatHomeService } }) .toStream() - .forEach(chatRoom -> - { - getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); - }); + .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } @Override public Mono putChatRoom(ChatRoom chatRoom) { - getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); return Mono.just(chatRoom); } @@ -77,11 +69,4 @@ public class InMemoryChatHomeService implements ChatHomeService { return Flux.fromStream(chatrooms[shard].values().stream()); } - - - private Map getChatRoomMapFor(ChatRoom chatRoom) - { - int shard = shardingStrategy.selectShard(chatRoom.getId()); - return chatrooms[shard]; - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 3e1d3603..7fff359c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -26,7 +26,8 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); + int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, clock, service, bufferSize)); + return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 1dca0402..96ef05c2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -45,23 +45,22 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy) { int numShards = properties.getInmemory().getNumShards(); - ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; storageStrategy .read() .subscribe(chatRoom -> { - int shard = shardingStrategy.selectShard(chatRoom.getId()); + int shard = chatRoom.getShard(); if (chatHomes[shard] == null) chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); }); - return new ShardedChatHome(chatHomes, shardingStrategy); + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + return new ShardedChatHome(chatHomes, strategy); } @Bean InMemoryChatHomeService chatHomeService( ChatBackendProperties properties, - ShardingStrategy shardingStrategy, StorageStrategy storageStrategy) { ShardingStrategyType sharding = @@ -73,7 +72,6 @@ public class InMemoryServicesConfiguration ? new int[] { 0 } : properties.getInmemory().getOwnedShards(); return new InMemoryChatHomeService( - shardingStrategy, numShards, ownedShards, storageStrategy.read()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index 0da1b953..df730aaf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -31,13 +32,14 @@ public class FilesStorageConfiguration public StorageStrategy storageStrategy( ChatBackendProperties properties, Clock clock, + ShardingStrategy shardingStrategy, ObjectMapper mapper) { return new FilesStorageStrategy( Paths.get(properties.getInmemory().getStorageDirectory()), clock, properties.getChatroomBufferSize(), - + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 5d3c067a..1e3e5eef 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; @@ -16,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -31,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -102,12 +105,18 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> new ChatRoom( + .map(chatRoomTo -> + { + UUID chatRoomId = chatRoomTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + return new ChatRoom( chatRoomTo.getId(), chatRoomTo.getName(), + shard, clock, factory.create(readMessages(chatRoomTo)), - bufferSize)); + bufferSize); + }); } public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 22d7c26b..2b33eed7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -21,12 +22,14 @@ public class MongoDbStorageConfiguration public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, ChatBackendProperties properties, - Clock clock) + Clock clock, + ShardingStrategy shardingStrategy) { return new MongoDbStorageStrategy( chatRoomRepository, clock, properties.getChatroomBufferSize(), + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 23002198..d21fe2ba 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy private final ChatRoomRepository repository; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; @@ -37,9 +39,11 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); + int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( chatRoomId, chatRoomTo.getName(), + shard, clock, factory.create( Flux diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index a2870284..1b25a11e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -170,6 +170,7 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); @@ -221,6 +222,7 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index d87155c2..9c418f17 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -27,6 +27,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -53,6 +54,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -76,6 +78,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -105,6 +108,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -134,6 +138,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java index c9d1b633..5b53607f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java @@ -25,6 +25,7 @@ public class SimpleChatHomeTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), mock(ChatRoomService.class), 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index d6909df4..fe7ecac1 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -41,6 +41,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT path, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } @@ -56,7 +57,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( - chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index ecc64d5a..9808aa31 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -55,7 +55,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( - chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); @@ -80,6 +79,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT chatRoomRepository, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux)); } -- 2.20.1 From 4f9e39284368922173f16d1e7ee87b416cc45ce6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Jan 2023 19:48:34 +0100 Subject: [PATCH 05/16] chore: Added a Docker-Build --- .dockerignore | 2 ++ .maven-dockerexclude | 1 + .maven-dockerinclude | 1 + Dockerfile | 5 +++++ pom.xml | 21 +++++++++++++++++++++ 5 files changed, 30 insertions(+) create mode 100644 .dockerignore create mode 100644 .maven-dockerexclude create mode 100644 .maven-dockerinclude create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..1ad99637 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 00000000..72e8ffc0 --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 00000000..fd6cecd2 --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..503f939e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM eclipse-temurin:17-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/pom.xml b/pom.xml index b5b0e81a..2819be45 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,27 @@ maven-failsafe-plugin + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + + + + build + package + + build + + + + -- 2.20.1 From 9abc6319cb4a72fffe2ceb5b11cc51fc82c772a0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Jan 2023 18:34:21 +0100 Subject: [PATCH 06/16] test: Added a test that assers disjunct chatrooms --- .../AbstractStorageStrategyIT.java | 47 +++++++++++++++++++ .../InMemoryWithMongoDbStorageIT.java | 3 ++ 2 files changed, 50 insertions(+) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index c934ff40..5d22d12e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -63,4 +63,51 @@ public abstract class AbstractStorageStrategyIT .getChatRoom(chatroom.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); } + + @Test + protected void testStoreAndRecreateParallelChatRooms() + { + start(); + + assertThat(chathome.getChatRooms().toStream()).hasSize(0); + + UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + ChatRoom chatroomA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + chathome.putChatRoom(chatroomA); + Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); + Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); + Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); + + UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); + ChatRoom chatroomB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + chathome.putChatRoom(chatroomB); + Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); + Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); + Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block(); + + assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); + assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA); + assertThat(chathome + .getChatRoom(chatroomA.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); + assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB); + assertThat(chathome + .getChatRoom(chatroomB.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); + + stop(); + start(); + + assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); + assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA); + assertThat(chathome + .getChatRoom(chatroomA.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); + assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB); + assertThat(chathome + .getChatRoom(chatroomB.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index 9808aa31..e56aff77 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -42,6 +42,8 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT @Autowired MongoDbStorageStrategy storageStrategy; @Autowired + ChatRoomRepository repository; + @Autowired Clock clock; @@ -116,5 +118,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT { Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); CONTAINER.followOutput(logConsumer); + repository.deleteAll(); } } -- 2.20.1 From 41e0a7cfd0025d5bea9722c8fc10c3b20a0fcad7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 22:08:11 +0100 Subject: [PATCH 07/16] refactor: Simplified the handling of `MessageMutationException` --- .../juplo/kafka/chat/backend/domain/ChatRoom.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 4f855b8c..02f5c08c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; import java.time.Clock; import java.time.LocalDateTime; @@ -67,9 +68,17 @@ public class ChatRoom Message.MessageKey key = Message.MessageKey.of(user, id); return service .getMessage(key) - .flatMap(existing -> text.equals(existing.getMessageText()) - ? Mono.just(existing) - : Mono.error(() -> new MessageMutationException(existing, text))) + .handle((Message existing, SynchronousSink sink) -> + { + if (existing.getMessageText().equals(text)) + { + sink.next(existing); + } + else + { + sink.error(new MessageMutationException(existing, text)); + } + }) .switchIfEmpty( Mono .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text)) -- 2.20.1 From 7281b4220170f451062bb32e0d5f63ec48d141d3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 24 Jan 2023 19:05:18 +0100 Subject: [PATCH 08/16] refactor: `ChatRoomService.persistMessage(..)` returns a `Mono` --- .../java/de/juplo/kafka/chat/backend/domain/ChatRoom.java | 2 +- .../de/juplo/kafka/chat/backend/domain/ChatRoomService.java | 2 +- .../persistence/inmemory/InMemoryChatRoomService.java | 4 ++-- .../kafka/chat/backend/api/ChatBackendControllerTest.java | 4 ++-- .../de/juplo/kafka/chat/backend/domain/ChatRoomTest.java | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 02f5c08c..da5eba2a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -81,7 +81,7 @@ public class ChatRoom }) .switchIfEmpty( Mono - .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text)) + .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text)) .doOnNext(m -> { Sinks.EmitResult result = sink.tryEmitNext(m); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java index c70ffe4e..374a442b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java @@ -8,7 +8,7 @@ import java.time.LocalDateTime; public interface ChatRoomService { - Message persistMessage( + Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java index 314e1f03..e1d5a5e3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java @@ -24,14 +24,14 @@ public class InMemoryChatRoomService implements ChatRoomService } @Override - public Message persistMessage( + public Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text) { Message message = new Message(key, (long)messages.size(), timestamp, text); messages.put(message.getKey(), message); - return message; + return Mono.just(message); } @Override diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 1b25a11e..b72294d9 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -183,7 +183,7 @@ public class ChatBackendControllerTest .thenReturn(Mono.just(existingMessage)); // Needed for readable error-reports, in case of a bug that leads to according unwanted call when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))) - .thenReturn(mock(Message.class)); + .thenReturn(Mono.just(mock(Message.class))); // When client @@ -231,7 +231,7 @@ public class ChatBackendControllerTest .thenReturn(Mono.empty()); // Needed for readable error-reports, in case of a bug that leads to according unwanted call when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))) - .thenReturn(mock(Message.class)); + .thenReturn(Mono.just(mock(Message.class))); // When client diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index 9c418f17..822ffe77 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -88,7 +88,7 @@ public class ChatRoomTest String messageText = "Bar"; Message message = new Message(key, 0l, timestamp, messageText); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(message); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When Mono mono = chatRoom.addMessage(messageId, user, messageText); @@ -118,7 +118,7 @@ public class ChatRoomTest String messageText = "Bar"; Message message = new Message(key, 0l, timestamp, messageText); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(message); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When Mono mono = chatRoom.addMessage(messageId, user, messageText); @@ -149,7 +149,7 @@ public class ChatRoomTest String mutatedText = "Boom!"; Message message = new Message(key, 0l, timestamp, messageText); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(message); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When Mono mono = chatRoom.addMessage(messageId, user, mutatedText); -- 2.20.1 From 676f6042961eeb89d10a6f82eb870760b66260ef Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 17 Feb 2023 23:34:55 +0100 Subject: [PATCH 09/16] refactor: Refined the configuration of `AbstractStorageStrategyIT' --- .../AbstractStorageStrategyIT.java | 16 +++++--- .../InMemoryWithFilesStorageIT.java | 37 ++++++++++++------- .../InMemoryWithMongoDbStorageIT.java | 37 ++++++++++++------- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 5d22d12e..d5e02b83 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -6,7 +6,6 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.UUID; -import java.util.function.Supplier; import static pl.rzrz.assertj.reactor.Assertions.*; @@ -19,13 +18,13 @@ public abstract class AbstractStorageStrategyIT protected abstract StorageStrategy getStorageStrategy(); - protected abstract Supplier getChatHomeServiceSupplier(); - protected abstract ChatRoomFactory getChatRoomFactory(); + protected abstract StorageStrategyITConfig getConfig(); protected void start() { - chathome = new SimpleChatHome(getChatHomeServiceSupplier().get()); - chatRoomFactory = getChatRoomFactory(); + StorageStrategyITConfig config = getConfig(); + chathome = new SimpleChatHome(config.getChatHomeService()); + chatRoomFactory = config.getChatRoomFactory(); } protected void stop() @@ -110,4 +109,11 @@ public abstract class AbstractStorageStrategyIT .getChatRoom(chatroomB.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); } + + + interface StorageStrategyITConfig + { + ChatHomeService getChatHomeService(); + ChatRoomFactory getChatRoomFactory(); + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index fe7ecac1..8ca7d472 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; @@ -18,7 +17,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Clock; -import java.util.function.Supplier; @Slf4j @@ -54,19 +52,32 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT } @Override - protected Supplier getChatHomeServiceSupplier() + protected StorageStrategyITConfig getConfig() { - return () -> new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - } + return new StorageStrategyITConfig() + { + InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); - @Override - protected ChatRoomFactory getChatRoomFactory() - { - ShardingStrategy strategy = chatRoomId -> 0; - return new InMemoryChatRoomFactory(strategy, clock, 8); + InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatRoomId -> 0, + clock, + 8); + + @Override + public ChatHomeService getChatHomeService() + { + return chatHomeService; + } + + @Override + public ChatRoomFactory getChatRoomFactory() + { + return chatRoomFactory; + } + }; } @BeforeEach diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index e56aff77..f86c9d3f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; @@ -28,7 +27,6 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.time.Clock; -import java.util.function.Supplier; @Testcontainers @@ -54,19 +52,32 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT } @Override - protected Supplier getChatHomeServiceSupplier() + protected StorageStrategyITConfig getConfig() { - return () -> new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - } + return new StorageStrategyITConfig() + { + InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); - @Override - protected ChatRoomFactory getChatRoomFactory() - { - ShardingStrategy strategy = chatRoomId -> 0; - return new InMemoryChatRoomFactory(strategy, clock, 8); + InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatRoomId -> 0, + clock, + 8); + + @Override + public ChatHomeService getChatHomeService() + { + return chatHomeService; + } + + @Override + public ChatRoomFactory getChatRoomFactory() + { + return chatRoomFactory; + } + }; } @TestConfiguration -- 2.20.1 From 977b454a542b95a5d08a94dd5f8b55814350b7cd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Feb 2023 00:10:03 +0100 Subject: [PATCH 10/16] refactor: DRY for the configuration of the `AbstractStorageStrategyIT` --- .../AbstractInMemoryStorageIT.java | 47 +++++++++++++++++++ .../InMemoryWithFilesStorageIT.java | 38 +-------------- .../InMemoryWithMongoDbStorageIT.java | 39 +++------------ 3 files changed, 55 insertions(+), 69 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java new file mode 100644 index 00000000..832ebd99 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -0,0 +1,47 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.Clock; + + +@RequiredArgsConstructor +@Slf4j +public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT +{ + final Clock clock; + + @Override + protected StorageStrategyITConfig getConfig() + { + return new StorageStrategyITConfig() + { + InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); + + InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatRoomId -> 0, + clock, + 8); + + @Override + public ChatHomeService getChatHomeService() + { + return chatHomeService; + } + + @Override + public ChatRoomFactory getChatRoomFactory() + { + return chatRoomFactory; + } + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index 8ca7d472..1bb08708 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -3,11 +3,7 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -20,18 +16,17 @@ import java.time.Clock; @Slf4j -public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT +public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT { final static Path path = Paths.get("target","files"); - final Clock clock; final ObjectMapper mapper; final FilesStorageStrategy storageStrategy; public InMemoryWithFilesStorageIT() { - clock = Clock.systemDefaultZone(); + super(Clock.systemDefaultZone()); mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); @@ -51,35 +46,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT return storageStrategy; } - @Override - protected StorageStrategyITConfig getConfig() - { - return new StorageStrategyITConfig() - { - InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - - InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( - chatRoomId -> 0, - clock, - 8); - - @Override - public ChatHomeService getChatHomeService() - { - return chatHomeService; - } - - @Override - public ChatRoomFactory getChatRoomFactory() - { - return chatRoomFactory; - } - }; - } - @BeforeEach void reset() throws Exception { diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index f86c9d3f..7ca9cb2f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -1,10 +1,6 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; @@ -35,7 +31,7 @@ import java.time.Clock; @AutoConfigureDataMongo @ContextConfiguration(initializers = DataSourceInitializer.class) @Slf4j -public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT +public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT { @Autowired MongoDbStorageStrategy storageStrategy; @@ -45,39 +41,16 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT Clock clock; - @Override - protected StorageStrategy getStorageStrategy() + public InMemoryWithMongoDbStorageIT() { - return storageStrategy; + super(Clock.systemDefaultZone()); } + @Override - protected StorageStrategyITConfig getConfig() + protected StorageStrategy getStorageStrategy() { - return new StorageStrategyITConfig() - { - InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - - InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( - chatRoomId -> 0, - clock, - 8); - - @Override - public ChatHomeService getChatHomeService() - { - return chatHomeService; - } - - @Override - public ChatRoomFactory getChatRoomFactory() - { - return chatRoomFactory; - } - }; + return storageStrategy; } @TestConfiguration -- 2.20.1 From 1d4b90c15b1571bce48389e2c34e7b15c1697b89 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Feb 2023 00:21:50 +0100 Subject: [PATCH 11/16] refactor: Refined the creation of new `ChatRoom`s - Dropped `ChatHomeService.putChatRoom(ChatRoom)`. - `ChatRoomFactory.create(UUID, String)` returns `ChatRoomInfo`. --- .../backend/api/ChatBackendController.java | 13 ++++--- .../chat/backend/api/ChatRoomInfoTo.java | 24 +++++++++++++ .../kafka/chat/backend/api/ChatRoomTo.java | 24 ------------- .../kafka/chat/backend/domain/ChatHome.java | 2 -- .../chat/backend/domain/ChatHomeService.java | 1 - .../kafka/chat/backend/domain/ChatRoom.java | 17 ++------- .../chat/backend/domain/ChatRoomFactory.java | 2 +- .../chat/backend/domain/ChatRoomInfo.java | 22 ++++++++++++ .../chat/backend/domain/ShardedChatHome.java | 6 ---- .../chat/backend/domain/SimpleChatHome.java | 6 ---- .../inmemory/InMemoryChatHomeService.java | 4 +-- .../inmemory/InMemoryChatRoomFactory.java | 12 +++---- .../InMemoryServicesConfiguration.java | 2 ++ .../storage/files/FilesStorageStrategy.java | 36 +++++++++---------- .../AbstractInMemoryStorageIT.java | 1 + .../AbstractStorageStrategyIT.java | 15 ++++---- 16 files changed, 92 insertions(+), 95 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 36bec48a..4db77ee2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -23,21 +23,20 @@ public class ChatBackendController @PostMapping("create") - public Mono create(@RequestBody String name) + public Mono create(@RequestBody String name) { UUID chatRoomId = UUID.randomUUID(); return factory .createChatRoom(chatRoomId, name) - .flatMap(chatRoom -> chatHome.putChatRoom(chatRoom)) - .map(ChatRoomTo::from); + .map(ChatRoomInfoTo::from); } @GetMapping("list") - public Flux list() + public Flux list() { return chatHome .getChatRooms() - .map(chatroom -> ChatRoomTo.from(chatroom)); + .map(chatroom -> ChatRoomInfoTo.from(chatroom)); } @GetMapping("{chatroomId}/list") @@ -51,11 +50,11 @@ public class ChatBackendController } @GetMapping("{chatroomId}") - public Mono get(@PathVariable UUID chatroomId) + public Mono get(@PathVariable UUID chatroomId) { return chatHome .getChatRoom(chatroomId) - .map(chatroom -> ChatRoomTo.from(chatroom)); + .map(chatroom -> ChatRoomInfoTo.from(chatroom)); } @PutMapping("{chatroomId}/{username}/{messageId}") diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java new file mode 100644 index 00000000..212fb8d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java @@ -0,0 +1,24 @@ +package de.juplo.kafka.chat.backend.api; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + +@Data +public class ChatRoomInfoTo +{ + private UUID id; + private String name; + private int shard; + + + public static ChatRoomInfoTo from(ChatRoomInfo info) + { + ChatRoomInfoTo to = new ChatRoomInfoTo(); + to.id = info.getId(); + to.name = info.getName(); + to.shard = info.getShard(); + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java deleted file mode 100644 index e997e4bb..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka.chat.backend.api; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import lombok.Data; - -import java.util.UUID; - -@Data -public class ChatRoomTo -{ - private UUID id; - private String name; - private int shard; - - - public static ChatRoomTo from(ChatRoom chatroom) - { - ChatRoomTo to = new ChatRoomTo(); - to.id = chatroom.getId(); - to.name = chatroom.getName(); - to.shard = chatroom.getShard(); - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 99439d36..6091c0c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,8 +8,6 @@ import java.util.UUID; public interface ChatHome { - Mono putChatRoom(ChatRoom chatRoom); - Mono getChatRoom(UUID id); Flux getChatRooms(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 7f13283a..19ff4aa4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,7 +8,6 @@ import java.util.UUID; public interface ChatHomeService { - Mono putChatRoom(ChatRoom chatRoom); Mono getChatRoom(int shard, UUID id); Flux getChatRooms(int shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index da5eba2a..cffc0ad0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -1,8 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -17,17 +14,9 @@ import java.util.regex.Pattern; @Slf4j -@EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "name" }) -public class ChatRoom +public class ChatRoom extends ChatRoomInfo { public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); - @Getter - private final UUID id; - @Getter - private final String name; - @Getter - private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -42,10 +31,8 @@ public class ChatRoom ChatRoomService service, int bufferSize) { + super(id, name, shard); log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); - this.id = id; - this.name = name; - this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java new file mode 100644 index 00000000..6d88be95 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.UUID; + + +@RequiredArgsConstructor +@EqualsAndHashCode(of = { "id" }) +@ToString(of = { "id", "name", "shard" }) +public class ChatRoomInfo +{ + @Getter + private final UUID id; + @Getter + private final String name; + @Getter + private final int shard; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index 3023f782..4b8c7f16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -37,12 +37,6 @@ public class ShardedChatHome implements ChatHome } - @Override - public Mono putChatRoom(ChatRoom chatRoom) - { - return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom); - } - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java index 46802c69..11542edd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java @@ -27,12 +27,6 @@ public class SimpleChatHome implements ChatHome } - @Override - public Mono putChatRoom(ChatRoom chatRoom) - { - return service.putChatRoom(chatRoom); - } - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index fd54d34c..8f262a0b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -51,11 +51,9 @@ public class InMemoryChatHomeService implements ChatHomeService .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } - @Override - public Mono putChatRoom(ChatRoom chatRoom) + public void putChatRoom(ChatRoom chatRoom) { chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - return Mono.just(chatRoom); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 7fff359c..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -1,9 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -17,17 +14,20 @@ import java.util.UUID; @Slf4j public class InMemoryChatRoomFactory implements ChatRoomFactory { + private final InMemoryChatHomeService chatHomeService; private final ShardingStrategy shardingStrategy; private final Clock clock; private final int bufferSize; @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatHomeService.putChatRoom(chatRoom); + return Mono.just(chatRoom); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 96ef05c2..de504485 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -79,11 +79,13 @@ public class InMemoryServicesConfiguration @Bean InMemoryChatRoomFactory chatRoomFactory( + InMemoryChatHomeService service, ShardingStrategy strategy, Clock clock, ChatBackendProperties properties) { return new InMemoryChatRoomFactory( + service, strategy, clock, properties.getChatroomBufferSize()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 1e3e5eef..f0ee1dfb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatRoomTo; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; @@ -82,9 +82,9 @@ public class FilesStorageStrategy implements StorageStrategy { try { - ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); - generator.writeObject(chatroomTo); - writeMessages(chatroomTo, chatroom.getMessages()); + ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom); + generator.writeObject(infoTo); + writeMessages(infoTo, chatroom.getMessages()); } catch (IOException e) { @@ -101,28 +101,28 @@ public class FilesStorageStrategy implements StorageStrategy @Override public Flux read() { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); + JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> + .map(infoTo -> { - UUID chatRoomId = chatRoomTo.getId(); + UUID chatRoomId = infoTo.getId(); int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), + infoTo.getId(), + infoTo.getName(), shard, clock, - factory.create(readMessages(chatRoomTo)), + factory.create(readMessages(infoTo)), bufferSize); }); } - public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) + public void writeMessages(ChatRoomInfoTo infoTo, Flux messageFlux) { - Path path = chatroomPath(chatroomTo); - log.info("Writing messages for {} to {}", chatroomTo, path); + Path path = chatroomPath(infoTo); + log.info("Writing messages for {} to {}", infoTo, path); try { Files.createDirectories(storagePath); @@ -177,11 +177,11 @@ public class FilesStorageStrategy implements StorageStrategy } } - public Flux readMessages(ChatRoomTo chatroomTo) + public Flux readMessages(ChatRoomInfoTo infoTo) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(infoTo), mapper, type)) .log() .map(MessageTo::toMessage); } @@ -191,8 +191,8 @@ public class FilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatRoomTo chatroomTo) + Path chatroomPath(ChatRoomInfoTo infoTo) { - return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); + return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json")); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index 832ebd99..dd76324c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -27,6 +27,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI getStorageStrategy().read()); InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatHomeService, chatRoomId -> 0, clock, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index d5e02b83..3ce527eb 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -40,8 +40,9 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoom chatroom = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); - chathome.putChatRoom(chatroom); + ChatRoomInfo info = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); + log.debug("Created chat-room {}", info); + ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); @@ -71,16 +72,18 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoom chatroomA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); - chathome.putChatRoom(chatroomA); + ChatRoomInfo infoA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + log.debug("Created chat-room {}", infoA); + ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); - ChatRoom chatroomB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); - chathome.putChatRoom(chatroomB); + ChatRoomInfo infoB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + log.debug("Created chat-room {}", infoB); + ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); -- 2.20.1 From 40b27168d58ffc1a4837c52fad98655b231cb4b2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 Feb 2023 15:30:20 +0100 Subject: [PATCH 12/16] refactor: `ChatRoomFactory` returns real `ChatRoom`s - `ChatHomeService` only deals with real `ChatRoom`s. - Hence, there is no need for `ChatRoomFactory`, to return the simplified interface `ChatRoomInfo`. - This is, because the implementation specific logic is implemented in the `ChatHomeService`, not `ChatHome` itself: the actual implementation is hidden behind that service (and behind the service `ChatRoomService` in the domain-class `ChatRoom`). --- .../de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java | 2 +- .../backend/persistence/inmemory/InMemoryChatRoomFactory.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 603795d9..324e4b02 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 2bde2361..9872ccb1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -21,7 +21,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); -- 2.20.1 From cd192a26afb8beb0e45cb0807e7f63e30e87966a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 19 Aug 2023 17:33:03 +0200 Subject: [PATCH 13/16] feat: Implemented new Default-`StorageStrategy` `none' * If `none` is selected as storage strategy, an empty implementation of `Storage-Strategy` instanciated. * The owned shardes are derived from the according configuration property. * Before, they were derived from the stored data. --- .../chat/backend/ChatBackendProperties.java | 4 +- .../InMemoryServicesConfiguration.java | 15 ++---- .../files/FilesStorageConfiguration.java | 3 +- .../NoStorageStorageConfiguration.java | 48 +++++++++++++++++++ ...ryWithFilesAndShardingConfigurationIT.java | 1 + .../InMemoryWithFilesConfigurationIT.java | 1 + 6 files changed, 58 insertions(+), 14 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 177d4f51..def9de17 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -25,11 +25,11 @@ public class ChatBackendProperties private ShardingStrategyType shardingStrategy = ShardingStrategyType.none; private int numShards = 1; private int[] ownedShards = new int[] { 0 }; - private StorageStrategyType storageStrategy = StorageStrategyType.files; + private StorageStrategyType storageStrategy = StorageStrategyType.none; private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } public enum ServiceType { inmemory } - public enum StorageStrategyType { files, mongodb } + public enum StorageStrategyType { none, files, mongodb } public enum ShardingStrategyType { none, kafkalike } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index de504485..175f7140 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -13,6 +13,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Clock; +import java.util.stream.IntStream; @ConditionalOnProperty( @@ -41,19 +42,13 @@ public class InMemoryServicesConfiguration havingValue = "kafkalike") ChatHome kafkalikeShardingChatHome( ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService, - StorageStrategy storageStrategy) + InMemoryChatHomeService chatHomeService) { int numShards = properties.getInmemory().getNumShards(); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; - storageStrategy - .read() - .subscribe(chatRoom -> - { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); - }); + IntStream + .of(properties.getInmemory().getOwnedShards()) + .forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard)); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHome(chatHomes, strategy); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index df730aaf..702d1d58 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -19,8 +19,7 @@ import java.time.Clock; @ConditionalOnProperty( prefix = "chat.backend.inmemory", name = "storage-strategy", - havingValue = "files", - matchIfMissing = true) + havingValue = "files") @Configuration @EnableAutoConfiguration( exclude = { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java new file mode 100644 index 00000000..824c6f2a --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/nostorage/NoStorageStorageConfiguration.java @@ -0,0 +1,48 @@ +package de.juplo.kafka.chat.backend.persistence.storage.nostorage; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.data.mongo.MongoRepositoriesAutoConfiguration; +import org.springframework.boot.autoconfigure.mongo.MongoAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import reactor.core.publisher.Flux; + +import java.nio.file.Paths; +import java.time.Clock; + + +@ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "storage-strategy", + havingValue = "none", + matchIfMissing = true) +@Configuration +@EnableAutoConfiguration( + exclude = { + MongoRepositoriesAutoConfiguration.class, + MongoAutoConfiguration.class }) +public class NoStorageStorageConfiguration +{ + @Bean + public StorageStrategy storageStrategy() + { + return new StorageStrategy() + { + @Override + public void write(Flux chatroomFlux) {} + + @Override + public Flux read() + { + return Flux.empty(); + } + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java index d792d258..72230e9c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java @@ -6,6 +6,7 @@ import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { + "chat.backend.inmemory.storage-strategy=files", "chat.backend.inmemory.storage-directory=target/test-classes/data/files", "chat.backend.inmemory.sharding-strategy=kafkalike", "chat.backend.inmemory.num-shards=10", diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java index 151a833a..2ff9e111 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java @@ -7,6 +7,7 @@ import org.springframework.boot.test.context.SpringBootTest; webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "chat.backend.inmemory.sharding-strategy=none", + "chat.backend.inmemory.storage-strategy=files", "chat.backend.inmemory.storage-directory=target/test-classes/data/files" }) class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT { -- 2.20.1 From 412635e46c4891f9565f341044878190c5674729 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Feb 2023 11:07:21 +0100 Subject: [PATCH 14/16] test: RED - Proofed existence of an NPE in `ShardedChatHome.getChatRoom()` - Refined `AbstractConfigurationIT` to show, that an NPE can occure in `ShardedChatHome.getChatRoom()`. - Defined the expected behaviour, if the NPE is handled correctly in the refined integration-test. --- .../chat/backend/AbstractConfigurationIT.java | 26 ++++++++++---- .../AbstractConfigurationWithShardingIT.java | 35 +++++++++++++++++++ ...ryWithFilesAndShardingConfigurationIT.java | 2 +- .../InMemoryWithMongoDbConfigurationIT.java | 2 +- 4 files changed, 57 insertions(+), 8 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationWithShardingIT.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java index c424b294..53ad04ea 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java @@ -20,8 +20,10 @@ public abstract class AbstractConfigurationIT @Test @DisplayName("The app starts, the data is restored and accessible") - void test() + void testAppStartsDataIsRestoredAndAccessible() { + String chatRoomId = "5c73531c-6fc4-426c-adcb-afc5c140a0f7"; + Awaitility .await() .atMost(Duration.ofSeconds(15)) @@ -29,13 +31,17 @@ public abstract class AbstractConfigurationIT { webTestClient .get() - .uri("http://localhost:{port}/actuator/health", port) + .uri( + "http://localhost:{port}/actuator/health", + port) .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.status").isEqualTo("UP"); webTestClient .get() - .uri("http://localhost:{port}/list", port) + .uri( + "http://localhost:{port}/list", + port) .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() @@ -44,21 +50,29 @@ public abstract class AbstractConfigurationIT .jsonPath("$[0].name").isEqualTo("FOO"); webTestClient .get() - .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7", port) + .uri("http://localhost:{port}/{chatRoomId}", + port, + chatRoomId) .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.name").isEqualTo("FOO"); webTestClient .get() - .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7/ute/1", port) + .uri( + "http://localhost:{port}/{chatRoomId}/ute/1", + port, + chatRoomId) .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.text").isEqualTo("Ich bin Ute..."); webTestClient .get() - .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7/peter/1", port) + .uri( + "http://localhost:{port}/{chatRoomId}/peter/1", + port, + chatRoomId) .accept(MediaType.APPLICATION_JSON) .exchange() .expectStatus().isOk() diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationWithShardingIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationWithShardingIT.java new file mode 100644 index 00000000..e6fd95a9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationWithShardingIT.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.http.MediaType; +import org.testcontainers.shaded.org.awaitility.Awaitility; + +import java.time.Duration; + + +public abstract class AbstractConfigurationWithShardingIT extends AbstractConfigurationIT +{ + @Test + @DisplayName("A PUT-message for a not owned shard yields 404 - NOT FOUND") + void testNotFoundForPutMessageToAChatRoomInNotOwnedShard() + { + String otherChatRoomId = "4e7246a6-29ae-43ea-b56f-669c3481ac19"; + + Awaitility + .await() + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> + webTestClient + .put() + .uri( + "http://localhost:{port}/{chatRoomId}/otto/66", + port, + otherChatRoomId) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue("The devil rules route 66") + .exchange() + .expectStatus().isNotFound()); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java index 72230e9c..fd2866b5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java @@ -11,6 +11,6 @@ import org.springframework.boot.test.context.SpringBootTest; "chat.backend.inmemory.sharding-strategy=kafkalike", "chat.backend.inmemory.num-shards=10", "chat.backend.inmemory.owned-shards=2" }) -class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationIT +class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationWithShardingIT { } diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java index 05f1de5f..69377369 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java @@ -17,7 +17,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; properties = { "spring.data.mongodb.host=localhost", "spring.data.mongodb.database=test", - "chat.backend.inmemory.sharding-strategy=kafkalike", + "chat.backend.inmemory.sharding-strategy=none", "chat.backend.inmemory.storage-strategy=mongodb" }) @Testcontainers @Slf4j -- 2.20.1 From 8837fa6b1caed563ef8fb1929e8d66609477153c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 18 Aug 2023 14:09:02 +0200 Subject: [PATCH 15/16] fix: GREEN - Fixed NPE in `ShardedChatHome.getChatRoom()` for foreign shard * `ShardedChatHome.getChatRoom(UUID)` know checks, if a `ChatHome` exists for the selected shard. * If no `ChatHome` exists, a `ShardNotOwnedException` is thrown. * The `ChatBackendControllerAdvice` translates the exception to an error of type 404 - NOT FOUND, to fullfill the defined expectations. --- .../backend/api/ChatBackendController.java | 2 +- .../api/ChatBackendControllerAdvice.java | 31 +++ .../domain/ShardNotOwnedException.java | 17 ++ .../chat/backend/domain/ShardedChatHome.java | 5 +- .../api/ChatBackendControllerTest.java | 185 +++++++++++++++++- 5 files changed, 228 insertions(+), 12 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 4db77ee2..339451a8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -70,7 +70,7 @@ public class ChatBackendController .flatMap(chatroom -> put(chatroom, username, messageId, text)); } - public Mono put( + private Mono put( ChatRoom chatroom, String username, Long messageId, diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java index 55350f17..ad90c4b6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerAdvice.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.api; import de.juplo.kafka.chat.backend.domain.InvalidUsernameException; import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; import de.juplo.kafka.chat.backend.domain.UnknownChatroomException; import org.springframework.beans.factory.annotation.Value; import org.springframework.http.HttpStatus; @@ -50,6 +51,36 @@ public class ChatBackendControllerAdvice return problem; } + @ExceptionHandler(ShardNotOwnedException.class) + public final ProblemDetail handleException( + ShardNotOwnedException e, + ServerWebExchange exchange, + UriComponentsBuilder uriComponentsBuilder) + { + final HttpStatus status = HttpStatus.NOT_FOUND; + ProblemDetail problem = ProblemDetail.forStatus(status); + + problem.setProperty("timestamp", new Date()); + + problem.setProperty("requestId", exchange.getRequest().getId()); + + problem.setType(uriComponentsBuilder.replacePath(contextPath).path("/problem/shard-not-owned").build().toUri()); + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(status.getReasonPhrase()); + stringBuilder.append(" - "); + stringBuilder.append(e.getMessage()); + problem.setTitle(stringBuilder.toString()); + + stringBuilder.setLength(0); + stringBuilder.append("Shard not owned: "); + stringBuilder.append(e.getShard()); + problem.setDetail(stringBuilder.toString()); + + problem.setProperty("shard", e.getShard()); + + return problem; + } + @ExceptionHandler(MessageMutationException.class) public final ProblemDetail handleException( MessageMutationException e, diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java new file mode 100644 index 00000000..3b638331 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.Getter; + + +public class ShardNotOwnedException extends IllegalStateException +{ + @Getter + private final int shard; + + + public ShardNotOwnedException(int shard) + { + super("This instance does not own the shard " + shard); + this.shard = shard; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index 4b8c7f16..6d2f0794 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -40,7 +40,10 @@ public class ShardedChatHome implements ChatHome @Override public Mono getChatRoom(UUID id) { - return chatHomes[selectShard(id)].getChatRoom(id); + int shard = selectShard(id); + if (chatHomes[shard] == null) + throw new ShardNotOwnedException(shard); + return chatHomes[shard].getChatRoom(id); } @Override diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index b72294d9..f66be459 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.api; +import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import lombok.extern.slf4j.Slf4j; @@ -15,7 +16,10 @@ import reactor.core.publisher.Mono; import java.time.Clock; import java.time.LocalDateTime; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -23,11 +27,19 @@ import static org.mockito.Mockito.*; @SpringBootTest(properties = { "spring.main.allow-bean-definition-overriding=true", - "chat.backend.inmemory.sharding-strategy=none" }) + "chat.backend.inmemory.sharding-strategy=kafkalike", + "chat.backend.inmemory.num-shards=10", + "chat.backend.inmemory.owned-shards=6", + }) @AutoConfigureWebTestClient @Slf4j public class ChatBackendControllerTest { + @Autowired + ChatBackendProperties properties; + @Autowired + ShardingStrategy shardingStrategy; + @MockBean InMemoryChatHomeService chatHomeService; @MockBean @@ -38,7 +50,7 @@ public class ChatBackendControllerTest void testUnknownChatroomExceptionForListChatroom(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When @@ -58,7 +70,7 @@ public class ChatBackendControllerTest void testUnknownChatroomExceptionForGetChatroom(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When @@ -77,7 +89,7 @@ public class ChatBackendControllerTest void testUnknownChatroomExceptionForPutMessage(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); String username = "foo"; Long messageId = 66l; when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); @@ -103,7 +115,7 @@ public class ChatBackendControllerTest void testUnknownChatroomExceptionForGetMessage(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); String username = "foo"; Long messageId = 66l; when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); @@ -128,7 +140,7 @@ public class ChatBackendControllerTest void testUnknownChatroomExceptionForListenChatroom(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); // When @@ -155,10 +167,10 @@ public class ChatBackendControllerTest @Test @DisplayName("Assert expected problem-details for message mutation on PUT /put/{chatroomId}/{username}/{messageId}") - void testMessageMutationException(@Autowired WebTestClient client) throws Exception + void testMessageMutationException(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); String user = "foo"; Long messageId = 66l; Message.MessageKey key = Message.MessageKey.of(user, messageId); @@ -211,10 +223,10 @@ public class ChatBackendControllerTest @Test @DisplayName("Assert expected problem-details for invalid username on PUT /put/{chatroomId}/{username}/{messageId}") - void testInvalidUsernameException(@Autowired WebTestClient client) throws Exception + void testInvalidUsernameException(@Autowired WebTestClient client) { // Given - UUID chatroomId = UUID.randomUUID(); + UUID chatroomId = getRandomIdForOwnedShard(); String user = "Foo"; Long messageId = 66l; Message.MessageKey key = Message.MessageKey.of(user, messageId); @@ -251,4 +263,157 @@ public class ChatBackendControllerTest .jsonPath("$.username").isEqualTo(user); verify(chatRoomService, never()).persistMessage(eq(key), any(LocalDateTime.class), any(String.class)); } + + @Test + @DisplayName("Assert expected problem-details for not owned shard on GET /{chatroomId}") + void testShardNotOwnedExceptionForGetChatroom(@Autowired WebTestClient client) + { + // Given + UUID chatroomId = getRandomIdForForeignShard(); + + // When + WebTestClient.ResponseSpec responseSpec = client + .get() + .uri("/{chatroomId}", chatroomId) + .accept(MediaType.APPLICATION_JSON) + .exchange(); + + // Then + assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId)); + } + + @Test + @DisplayName("Assert expected problem-details for not owned shard on GET /list/{chatroomId}") + void testShardNotOwnedExceptionForListChatroom(@Autowired WebTestClient client) + { + // Given + UUID chatroomId = getRandomIdForForeignShard(); + + // When + WebTestClient.ResponseSpec responseSpec = client + .get() + .uri("/{chatroomId}/list", chatroomId) + .accept(MediaType.APPLICATION_JSON) + .exchange(); + + // Then + assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId)); + } + + @Test + @DisplayName("Assert expected problem-details for now owned shard on PUT /put/{chatroomId}/{username}/{messageId}") + void testShardNotOwnedExceptionForPutMessage(@Autowired WebTestClient client) + { + // Given + UUID chatroomId = getRandomIdForForeignShard(); + String username = "foo"; + Long messageId = 66l; + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + + // When + WebTestClient.ResponseSpec responseSpec = client + .put() + .uri( + "/{chatroomId}/{username}/{messageId}", + chatroomId, + username, + messageId) + .bodyValue("bar") + .accept(MediaType.APPLICATION_JSON) + .exchange(); + + // Then + assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId)); + } + + @Test + @DisplayName("Assert expected problem-details for not owned shard on GET /get/{chatroomId}/{username}/{messageId}") + void testShardNotOwnedExceptionForGetMessage(@Autowired WebTestClient client) + { + // Given + UUID chatroomId = getRandomIdForForeignShard(); + String username = "foo"; + Long messageId = 66l; + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + + // When + WebTestClient.ResponseSpec responseSpec = client + .get() + .uri( + "/{chatroomId}/{username}/{messageId}", + chatroomId, + username, + messageId) + .accept(MediaType.APPLICATION_JSON) + .exchange(); + + // Then + assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId)); + } + + @Test + @DisplayName("Assert expected problem-details for not owned shard on GET /listen/{chatroomId}") + void testShardNotOwnedExceptionForListenChatroom(@Autowired WebTestClient client) + { + // Given + UUID chatroomId = getRandomIdForForeignShard(); + when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + + // When + WebTestClient.ResponseSpec responseSpec = client + .get() + .uri("/{chatroomId}/listen", chatroomId) + // .accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON) << TODO: Does not work! + .exchange(); + + // Then + assertProblemDetailsForShardNotOwnedException(responseSpec, shardingStrategy.selectShard(chatroomId)); + } + + private void assertProblemDetailsForShardNotOwnedException( + WebTestClient.ResponseSpec responseSpec, + int shard) + { + responseSpec + .expectStatus().isNotFound() + .expectBody() + .jsonPath("$.type").isEqualTo("/problem/shard-not-owned") + .jsonPath("$.shard").isEqualTo(shard); + } + + private UUID getRandomIdForOwnedShard() + { + Set ownedShards = ownedShards(); + UUID randomId; + + do + { + randomId = UUID.randomUUID(); + } + while (!ownedShards.contains(shardingStrategy.selectShard(randomId))); + + return randomId; + } + + private UUID getRandomIdForForeignShard() + { + Set ownedShards = ownedShards(); + UUID randomId; + + do + { + randomId = UUID.randomUUID(); + } + while (ownedShards.contains(shardingStrategy.selectShard(randomId))); + + return randomId; + } + + private Set ownedShards() + { + return IntStream + .of(properties.getInmemory().getOwnedShards()) + .mapToObj(shard -> Integer.valueOf(shard)) + .collect(Collectors.toSet()); + } } -- 2.20.1 From 5c2cfba556815cf32950e8918f99e06299dee015 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 20 Aug 2023 12:57:26 +0200 Subject: [PATCH 16/16] refactor: Moved implementation details out of `domain` -- Moved classes --- .../backend/{domain => persistence/inmemory}/ChatHomeService.java | 0 .../persistence/{ => inmemory}/KafkaLikeShardingStrategy.java | 0 .../backend/{domain => persistence/inmemory}/ShardedChatHome.java | 0 .../{domain => persistence/inmemory}/ShardingStrategy.java | 0 .../backend/{domain => persistence/inmemory}/SimpleChatHome.java | 0 .../{domain => persistence/inmemory}/SimpleChatHomeTest.java | 0 6 files changed, 0 insertions(+), 0 deletions(-) rename src/main/java/de/juplo/kafka/chat/backend/{domain => persistence/inmemory}/ChatHomeService.java (100%) rename src/main/java/de/juplo/kafka/chat/backend/persistence/{ => inmemory}/KafkaLikeShardingStrategy.java (100%) rename src/main/java/de/juplo/kafka/chat/backend/{domain => persistence/inmemory}/ShardedChatHome.java (100%) rename src/main/java/de/juplo/kafka/chat/backend/{domain => persistence/inmemory}/ShardingStrategy.java (100%) rename src/main/java/de/juplo/kafka/chat/backend/{domain => persistence/inmemory}/SimpleChatHome.java (100%) rename src/test/java/de/juplo/kafka/chat/backend/{domain => persistence/inmemory}/SimpleChatHomeTest.java (100%) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ChatHomeService.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ChatHomeService.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/persistence/KafkaLikeShardingStrategy.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/KafkaLikeShardingStrategy.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardedChatHome.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardingStrategy.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/domain/ShardingStrategy.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/ShardingStrategy.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java similarity index 100% rename from src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java rename to src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHome.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java similarity index 100% rename from src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java rename to src/test/java/de/juplo/kafka/chat/backend/persistence/inmemory/SimpleChatHomeTest.java -- 2.20.1