From 5e027064643ea9060c3a2cc8f4272360fa02abd3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 17:40:21 +0100 Subject: [PATCH 01/16] chore: Switched the default-configuration to sharding `none` --- .../de/juplo/kafka/chat/backend/ChatBackendProperties.java | 6 +++--- .../InMemoryWithFilesAndShardingConfigurationIT.java | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 5cd4535c..177d4f51 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -22,9 +22,9 @@ public class ChatBackendProperties @Setter public static class InMemoryServicesProperties { - private ShardingStrategyType shardingStrategy = ShardingStrategyType.kafkalike; - private int numShards = 10; - private int[] ownedShards = { 2 }; + private ShardingStrategyType shardingStrategy = ShardingStrategyType.none; + private int numShards = 1; + private int[] ownedShards = new int[] { 0 }; private StorageStrategyType storageStrategy = StorageStrategyType.files; private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java index 5138deed..d792d258 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java @@ -7,7 +7,9 @@ import org.springframework.boot.test.context.SpringBootTest; webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { "chat.backend.inmemory.storage-directory=target/test-classes/data/files", - "chat.backend.inmemory.sharding-strategy=kafkalike" }) + "chat.backend.inmemory.sharding-strategy=kafkalike", + "chat.backend.inmemory.num-shards=10", + "chat.backend.inmemory.owned-shards=2" }) class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationIT { } -- 2.20.1 From 6eb03cbe4609eea0b7061b574a7e273ecd0c9813 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 18:11:56 +0100 Subject: [PATCH 02/16] fix: Removed unnecessary generic in `ChatHomeService` --- .../de/juplo/kafka/chat/backend/domain/ChatHomeService.java | 2 +- .../backend/persistence/inmemory/InMemoryChatHomeService.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index d2bc508c..7f13283a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -6,7 +6,7 @@ import reactor.core.publisher.Mono; import java.util.UUID; -public interface ChatHomeService +public interface ChatHomeService { Mono putChatRoom(ChatRoom chatRoom); Mono getChatRoom(int shard, UUID id); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 0beb7f36..95a30dbd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -11,7 +11,7 @@ import java.util.*; @Slf4j -public class InMemoryChatHomeService implements ChatHomeService +public class InMemoryChatHomeService implements ChatHomeService { private final ShardingStrategy shardingStrategy; private final Map[] chatrooms; -- 2.20.1 From daca33d027e4c0d036fc2aa7c3d9b2120f3ad98a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 18:09:46 +0100 Subject: [PATCH 03/16] fix: Fixed a NPE in `ShardedChatHome.getChatRooms()` - When collecting the `ChatRoom`s for all shards, unused shards with a `null` value were not skipped. - Also added log-messages of level `INFO` for the creation of `ChatRoom`, `SimpleChatHome` and `ShardedChatHome`. --- .../kafka/chat/backend/domain/ChatRoom.java | 1 + .../chat/backend/domain/ShardedChatHome.java | 35 +++++++++++++++---- .../chat/backend/domain/SimpleChatHome.java | 9 +++-- .../chat/backend/AbstractConfigurationIT.java | 9 +++++ 4 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 1c21fb97..cc5c5a07 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -38,6 +38,7 @@ public class ChatRoom ChatRoomService service, int bufferSize) { + log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); this.id = id; this.name = name; this.clock = clock; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index ffa7860a..3023f782 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -1,17 +1,40 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; -@RequiredArgsConstructor +@Slf4j public class ShardedChatHome implements ChatHome { private final ChatHome[] chatHomes; - private final ShardingStrategy selectionStrategy; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; + + + public ShardedChatHome( + ChatHome[] chatHomes, + ShardingStrategy shardingStrategy) + { + this.chatHomes = chatHomes; + this.shardingStrategy = shardingStrategy; + this.ownedShards = new HashSet<>(); + for (int shard = 0; shard < chatHomes.length; shard++) + if(chatHomes[shard] != null) + this.ownedShards.add(shard); + log.info( + "Created ShardedChatHome for shards: {}", + ownedShards + .stream() + .map(String::valueOf) + .collect(Collectors.joining(", "))); + } @Override @@ -30,13 +53,13 @@ public class ShardedChatHome implements ChatHome public Flux getChatRooms() { return Flux - .fromArray(chatHomes) - .flatMap(chatHome -> chatHome.getChatRooms()); + .fromIterable(ownedShards) + .flatMap(shard -> chatHomes[shard].getChatRooms()); } private int selectShard(UUID chatroomId) { - return selectionStrategy.selectShard(chatroomId); + return shardingStrategy.selectShard(chatroomId); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java index b15eab3b..46802c69 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -9,13 +8,19 @@ import java.util.*; @Slf4j -@RequiredArgsConstructor public class SimpleChatHome implements ChatHome { private final ChatHomeService service; private final int shard; + public SimpleChatHome(ChatHomeService service, int shard) + { + log.info("Created SimpleChatHome for shard {}", shard); + this.service = service; + this.shard = shard; + } + public SimpleChatHome(ChatHomeService service) { this(service, 0); diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java index 8c309b84..c424b294 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java @@ -33,6 +33,15 @@ public abstract class AbstractConfigurationIT .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.status").isEqualTo("UP"); + webTestClient + .get() + .uri("http://localhost:{port}/list", port) + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus().isOk() + .expectBody() + .jsonPath("$.length()").isEqualTo(1) + .jsonPath("$[0].name").isEqualTo("FOO"); webTestClient .get() .uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7", port) -- 2.20.1 From 2095f4c6a102a52f2a15360d1b6355e4990f8f43 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 14 Jan 2023 19:53:16 +0100 Subject: [PATCH 04/16] feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint - Technically, the attribute is not needed. - But, to implement the gua/sha-pattern, it is needed as routing-hing. - Hence, it is reintroduced here and also added to the `ChatRoomTo` to be send to the client. --- .../kafka/chat/backend/api/ChatRoomTo.java | 2 ++ .../kafka/chat/backend/domain/ChatRoom.java | 4 ++++ .../inmemory/InMemoryChatHomeService.java | 21 +++---------------- .../inmemory/InMemoryChatRoomFactory.java | 3 ++- .../InMemoryServicesConfiguration.java | 8 +++---- .../files/FilesStorageConfiguration.java | 4 +++- .../storage/files/FilesStorageStrategy.java | 13 ++++++++++-- .../mongodb/MongoDbStorageConfiguration.java | 5 ++++- .../mongodb/MongoDbStorageStrategy.java | 4 ++++ .../api/ChatBackendControllerTest.java | 2 ++ .../chat/backend/domain/ChatRoomTest.java | 5 +++++ .../backend/domain/SimpleChatHomeTest.java | 1 + .../InMemoryWithFilesStorageIT.java | 2 +- .../InMemoryWithMongoDbStorageIT.java | 2 +- 14 files changed, 46 insertions(+), 30 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java index 3cc59210..e997e4bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java @@ -10,6 +10,7 @@ public class ChatRoomTo { private UUID id; private String name; + private int shard; public static ChatRoomTo from(ChatRoom chatroom) @@ -17,6 +18,7 @@ public class ChatRoomTo ChatRoomTo to = new ChatRoomTo(); to.id = chatroom.getId(); to.name = chatroom.getName(); + to.shard = chatroom.getShard(); return to; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cc5c5a07..4f855b8c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -25,6 +25,8 @@ public class ChatRoom private final UUID id; @Getter private final String name; + @Getter + private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -34,6 +36,7 @@ public class ChatRoom public ChatRoom( UUID id, String name, + int shard, Clock clock, ChatRoomService service, int bufferSize) @@ -41,6 +44,7 @@ public class ChatRoom log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); this.id = id; this.name = name; + this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 95a30dbd..fd54d34c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,18 +12,15 @@ import java.util.*; @Slf4j public class InMemoryChatHomeService implements ChatHomeService { - private final ShardingStrategy shardingStrategy; private final Map[] chatrooms; public InMemoryChatHomeService( - ShardingStrategy shardingStrategy, int numShards, int[] ownedShards, Flux chatroomFlux) { log.debug("Creating InMemoryChatHomeService"); - this.shardingStrategy = shardingStrategy; this.chatrooms = new Map[numShards]; Set owned = Arrays .stream(ownedShards) @@ -41,8 +37,7 @@ public class InMemoryChatHomeService implements ChatHomeService chatroomFlux .filter(chatRoom -> { - int shard = shardingStrategy.selectShard(chatRoom.getId()); - if (owned.contains(shard)) + if (owned.contains(chatRoom.getShard())) { return true; } @@ -53,16 +48,13 @@ public class InMemoryChatHomeService implements ChatHomeService } }) .toStream() - .forEach(chatRoom -> - { - getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); - }); + .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } @Override public Mono putChatRoom(ChatRoom chatRoom) { - getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); return Mono.just(chatRoom); } @@ -77,11 +69,4 @@ public class InMemoryChatHomeService implements ChatHomeService { return Flux.fromStream(chatrooms[shard].values().stream()); } - - - private Map getChatRoomMapFor(ChatRoom chatRoom) - { - int shard = shardingStrategy.selectShard(chatRoom.getId()); - return chatrooms[shard]; - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 3e1d3603..7fff359c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -26,7 +26,8 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); + int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, clock, service, bufferSize)); + return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 1dca0402..96ef05c2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -45,23 +45,22 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy) { int numShards = properties.getInmemory().getNumShards(); - ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; storageStrategy .read() .subscribe(chatRoom -> { - int shard = shardingStrategy.selectShard(chatRoom.getId()); + int shard = chatRoom.getShard(); if (chatHomes[shard] == null) chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); }); - return new ShardedChatHome(chatHomes, shardingStrategy); + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + return new ShardedChatHome(chatHomes, strategy); } @Bean InMemoryChatHomeService chatHomeService( ChatBackendProperties properties, - ShardingStrategy shardingStrategy, StorageStrategy storageStrategy) { ShardingStrategyType sharding = @@ -73,7 +72,6 @@ public class InMemoryServicesConfiguration ? new int[] { 0 } : properties.getInmemory().getOwnedShards(); return new InMemoryChatHomeService( - shardingStrategy, numShards, ownedShards, storageStrategy.read()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index 0da1b953..df730aaf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -31,13 +32,14 @@ public class FilesStorageConfiguration public StorageStrategy storageStrategy( ChatBackendProperties properties, Clock clock, + ShardingStrategy shardingStrategy, ObjectMapper mapper) { return new FilesStorageStrategy( Paths.get(properties.getInmemory().getStorageDirectory()), clock, properties.getChatroomBufferSize(), - + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 5d3c067a..1e3e5eef 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; @@ -16,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -31,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -102,12 +105,18 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> new ChatRoom( + .map(chatRoomTo -> + { + UUID chatRoomId = chatRoomTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + return new ChatRoom( chatRoomTo.getId(), chatRoomTo.getName(), + shard, clock, factory.create(readMessages(chatRoomTo)), - bufferSize)); + bufferSize); + }); } public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 22d7c26b..2b33eed7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -21,12 +22,14 @@ public class MongoDbStorageConfiguration public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, ChatBackendProperties properties, - Clock clock) + Clock clock, + ShardingStrategy shardingStrategy) { return new MongoDbStorageStrategy( chatRoomRepository, clock, properties.getChatroomBufferSize(), + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 23002198..d21fe2ba 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy private final ChatRoomRepository repository; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; @@ -37,9 +39,11 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); + int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( chatRoomId, chatRoomTo.getName(), + shard, clock, factory.create( Flux diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index a2870284..1b25a11e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -170,6 +170,7 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); @@ -221,6 +222,7 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index d87155c2..9c418f17 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -27,6 +27,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -53,6 +54,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -76,6 +78,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -105,6 +108,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -134,6 +138,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java index c9d1b633..5b53607f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java @@ -25,6 +25,7 @@ public class SimpleChatHomeTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), mock(ChatRoomService.class), 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index d6909df4..fe7ecac1 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -41,6 +41,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT path, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } @@ -56,7 +57,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( - chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index ecc64d5a..9808aa31 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -55,7 +55,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( - chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); @@ -80,6 +79,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT chatRoomRepository, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux)); } -- 2.20.1 From 4f9e39284368922173f16d1e7ee87b416cc45ce6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 15 Jan 2023 19:48:34 +0100 Subject: [PATCH 05/16] chore: Added a Docker-Build --- .dockerignore | 2 ++ .maven-dockerexclude | 1 + .maven-dockerinclude | 1 + Dockerfile | 5 +++++ pom.xml | 21 +++++++++++++++++++++ 5 files changed, 30 insertions(+) create mode 100644 .dockerignore create mode 100644 .maven-dockerexclude create mode 100644 .maven-dockerinclude create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..1ad99637 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 00000000..72e8ffc0 --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 00000000..fd6cecd2 --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..503f939e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM eclipse-temurin:17-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/pom.xml b/pom.xml index b5b0e81a..2819be45 100644 --- a/pom.xml +++ b/pom.xml @@ -112,6 +112,27 @@ maven-failsafe-plugin + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + + + + build + package + + build + + + + -- 2.20.1 From 9abc6319cb4a72fffe2ceb5b11cc51fc82c772a0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Jan 2023 18:34:21 +0100 Subject: [PATCH 06/16] test: Added a test that assers disjunct chatrooms --- .../AbstractStorageStrategyIT.java | 47 +++++++++++++++++++ .../InMemoryWithMongoDbStorageIT.java | 3 ++ 2 files changed, 50 insertions(+) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index c934ff40..5d22d12e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -63,4 +63,51 @@ public abstract class AbstractStorageStrategyIT .getChatRoom(chatroom.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(m1, m2, m3, m4); } + + @Test + protected void testStoreAndRecreateParallelChatRooms() + { + start(); + + assertThat(chathome.getChatRooms().toStream()).hasSize(0); + + UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); + ChatRoom chatroomA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + chathome.putChatRoom(chatroomA); + Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); + Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); + Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); + + UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); + ChatRoom chatroomB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + chathome.putChatRoom(chatroomB); + Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); + Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); + Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); + Message mb4 = chatroomB.addMessage(2l, "peter", "Hä? Was jetzt?!? Isch glohb isch höb ühn däjah vüh...").block(); + + assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); + assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA); + assertThat(chathome + .getChatRoom(chatroomA.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); + assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB); + assertThat(chathome + .getChatRoom(chatroomB.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); + + stop(); + start(); + + assertThat(chathome.getChatRooms().toStream()).containsExactlyInAnyOrderElementsOf(List.of(chatroomA, chatroomB)); + assertThat(chathome.getChatRoom(chatroomA.getId())).emitsExactly(chatroomA); + assertThat(chathome + .getChatRoom(chatroomA.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(ma1, ma2, ma3, ma4); + assertThat(chathome.getChatRoom(chatroomB.getId())).emitsExactly(chatroomB); + assertThat(chathome + .getChatRoom(chatroomB.getId()) + .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index 9808aa31..e56aff77 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -42,6 +42,8 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT @Autowired MongoDbStorageStrategy storageStrategy; @Autowired + ChatRoomRepository repository; + @Autowired Clock clock; @@ -116,5 +118,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT { Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(log); CONTAINER.followOutput(logConsumer); + repository.deleteAll(); } } -- 2.20.1 From 41e0a7cfd0025d5bea9722c8fc10c3b20a0fcad7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 22:08:11 +0100 Subject: [PATCH 07/16] refactor: Simplified the handling of `MessageMutationException` --- .../juplo/kafka/chat/backend/domain/ChatRoom.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 4f855b8c..02f5c08c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.SynchronousSink; import java.time.Clock; import java.time.LocalDateTime; @@ -67,9 +68,17 @@ public class ChatRoom Message.MessageKey key = Message.MessageKey.of(user, id); return service .getMessage(key) - .flatMap(existing -> text.equals(existing.getMessageText()) - ? Mono.just(existing) - : Mono.error(() -> new MessageMutationException(existing, text))) + .handle((Message existing, SynchronousSink sink) -> + { + if (existing.getMessageText().equals(text)) + { + sink.next(existing); + } + else + { + sink.error(new MessageMutationException(existing, text)); + } + }) .switchIfEmpty( Mono .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text)) -- 2.20.1 From 7281b4220170f451062bb32e0d5f63ec48d141d3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 24 Jan 2023 19:05:18 +0100 Subject: [PATCH 08/16] refactor: `ChatRoomService.persistMessage(..)` returns a `Mono` --- .../java/de/juplo/kafka/chat/backend/domain/ChatRoom.java | 2 +- .../de/juplo/kafka/chat/backend/domain/ChatRoomService.java | 2 +- .../persistence/inmemory/InMemoryChatRoomService.java | 4 ++-- .../kafka/chat/backend/api/ChatBackendControllerTest.java | 4 ++-- .../de/juplo/kafka/chat/backend/domain/ChatRoomTest.java | 6 +++--- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 02f5c08c..da5eba2a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -81,7 +81,7 @@ public class ChatRoom }) .switchIfEmpty( Mono - .fromSupplier(() ->service.persistMessage(key, LocalDateTime.now(clock), text)) + .defer(() -> service.persistMessage(key, LocalDateTime.now(clock), text)) .doOnNext(m -> { Sinks.EmitResult result = sink.tryEmitNext(m); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java index c70ffe4e..374a442b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomService.java @@ -8,7 +8,7 @@ import java.time.LocalDateTime; public interface ChatRoomService { - Message persistMessage( + Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java index 314e1f03..e1d5a5e3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomService.java @@ -24,14 +24,14 @@ public class InMemoryChatRoomService implements ChatRoomService } @Override - public Message persistMessage( + public Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text) { Message message = new Message(key, (long)messages.size(), timestamp, text); messages.put(message.getKey(), message); - return message; + return Mono.just(message); } @Override diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 1b25a11e..b72294d9 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -183,7 +183,7 @@ public class ChatBackendControllerTest .thenReturn(Mono.just(existingMessage)); // Needed for readable error-reports, in case of a bug that leads to according unwanted call when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))) - .thenReturn(mock(Message.class)); + .thenReturn(Mono.just(mock(Message.class))); // When client @@ -231,7 +231,7 @@ public class ChatBackendControllerTest .thenReturn(Mono.empty()); // Needed for readable error-reports, in case of a bug that leads to according unwanted call when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))) - .thenReturn(mock(Message.class)); + .thenReturn(Mono.just(mock(Message.class))); // When client diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index 9c418f17..822ffe77 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -88,7 +88,7 @@ public class ChatRoomTest String messageText = "Bar"; Message message = new Message(key, 0l, timestamp, messageText); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.empty()); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(message); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When Mono mono = chatRoom.addMessage(messageId, user, messageText); @@ -118,7 +118,7 @@ public class ChatRoomTest String messageText = "Bar"; Message message = new Message(key, 0l, timestamp, messageText); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(message); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When Mono mono = chatRoom.addMessage(messageId, user, messageText); @@ -149,7 +149,7 @@ public class ChatRoomTest String mutatedText = "Boom!"; Message message = new Message(key, 0l, timestamp, messageText); when(chatRoomService.getMessage(any(Message.MessageKey.class))).thenReturn(Mono.just(message)); - when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(message); + when(chatRoomService.persistMessage(any(Message.MessageKey.class), any(LocalDateTime.class), any(String.class))).thenReturn(Mono.just(message)); // When Mono mono = chatRoom.addMessage(messageId, user, mutatedText); -- 2.20.1 From 676f6042961eeb89d10a6f82eb870760b66260ef Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 17 Feb 2023 23:34:55 +0100 Subject: [PATCH 09/16] refactor: Refined the configuration of `AbstractStorageStrategyIT' --- .../AbstractStorageStrategyIT.java | 16 +++++--- .../InMemoryWithFilesStorageIT.java | 37 ++++++++++++------- .../InMemoryWithMongoDbStorageIT.java | 37 ++++++++++++------- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 5d22d12e..d5e02b83 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -6,7 +6,6 @@ import org.junit.jupiter.api.Test; import java.util.List; import java.util.UUID; -import java.util.function.Supplier; import static pl.rzrz.assertj.reactor.Assertions.*; @@ -19,13 +18,13 @@ public abstract class AbstractStorageStrategyIT protected abstract StorageStrategy getStorageStrategy(); - protected abstract Supplier getChatHomeServiceSupplier(); - protected abstract ChatRoomFactory getChatRoomFactory(); + protected abstract StorageStrategyITConfig getConfig(); protected void start() { - chathome = new SimpleChatHome(getChatHomeServiceSupplier().get()); - chatRoomFactory = getChatRoomFactory(); + StorageStrategyITConfig config = getConfig(); + chathome = new SimpleChatHome(config.getChatHomeService()); + chatRoomFactory = config.getChatRoomFactory(); } protected void stop() @@ -110,4 +109,11 @@ public abstract class AbstractStorageStrategyIT .getChatRoom(chatroomB.getId()) .flatMapMany(cr -> cr.getMessages())).emitsExactly(mb1, mb2, mb3, mb4); } + + + interface StorageStrategyITConfig + { + ChatHomeService getChatHomeService(); + ChatRoomFactory getChatRoomFactory(); + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index fe7ecac1..8ca7d472 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -3,7 +3,6 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; @@ -18,7 +17,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.time.Clock; -import java.util.function.Supplier; @Slf4j @@ -54,19 +52,32 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT } @Override - protected Supplier getChatHomeServiceSupplier() + protected StorageStrategyITConfig getConfig() { - return () -> new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - } + return new StorageStrategyITConfig() + { + InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); - @Override - protected ChatRoomFactory getChatRoomFactory() - { - ShardingStrategy strategy = chatRoomId -> 0; - return new InMemoryChatRoomFactory(strategy, clock, 8); + InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatRoomId -> 0, + clock, + 8); + + @Override + public ChatHomeService getChatHomeService() + { + return chatHomeService; + } + + @Override + public ChatRoomFactory getChatRoomFactory() + { + return chatRoomFactory; + } + }; } @BeforeEach diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index e56aff77..f86c9d3f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHomeService; import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; @@ -28,7 +27,6 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import java.time.Clock; -import java.util.function.Supplier; @Testcontainers @@ -54,19 +52,32 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT } @Override - protected Supplier getChatHomeServiceSupplier() + protected StorageStrategyITConfig getConfig() { - return () -> new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - } + return new StorageStrategyITConfig() + { + InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); - @Override - protected ChatRoomFactory getChatRoomFactory() - { - ShardingStrategy strategy = chatRoomId -> 0; - return new InMemoryChatRoomFactory(strategy, clock, 8); + InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatRoomId -> 0, + clock, + 8); + + @Override + public ChatHomeService getChatHomeService() + { + return chatHomeService; + } + + @Override + public ChatRoomFactory getChatRoomFactory() + { + return chatRoomFactory; + } + }; } @TestConfiguration -- 2.20.1 From 977b454a542b95a5d08a94dd5f8b55814350b7cd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Feb 2023 00:10:03 +0100 Subject: [PATCH 10/16] refactor: DRY for the configuration of the `AbstractStorageStrategyIT` --- .../AbstractInMemoryStorageIT.java | 47 +++++++++++++++++++ .../InMemoryWithFilesStorageIT.java | 38 +-------------- .../InMemoryWithMongoDbStorageIT.java | 39 +++------------ 3 files changed, 55 insertions(+), 69 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java new file mode 100644 index 00000000..832ebd99 --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -0,0 +1,47 @@ +package de.juplo.kafka.chat.backend.persistence; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.Clock; + + +@RequiredArgsConstructor +@Slf4j +public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyIT +{ + final Clock clock; + + @Override + protected StorageStrategyITConfig getConfig() + { + return new StorageStrategyITConfig() + { + InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); + + InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatRoomId -> 0, + clock, + 8); + + @Override + public ChatHomeService getChatHomeService() + { + return chatHomeService; + } + + @Override + public ChatRoomFactory getChatRoomFactory() + { + return chatRoomFactory; + } + }; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index 8ca7d472..1bb08708 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -3,11 +3,7 @@ package de.juplo.kafka.chat.backend.persistence; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.storage.files.FilesStorageStrategy; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeEach; @@ -20,18 +16,17 @@ import java.time.Clock; @Slf4j -public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT +public class InMemoryWithFilesStorageIT extends AbstractInMemoryStorageIT { final static Path path = Paths.get("target","files"); - final Clock clock; final ObjectMapper mapper; final FilesStorageStrategy storageStrategy; public InMemoryWithFilesStorageIT() { - clock = Clock.systemDefaultZone(); + super(Clock.systemDefaultZone()); mapper = new ObjectMapper(); mapper.registerModule(new JavaTimeModule()); mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); @@ -51,35 +46,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT return storageStrategy; } - @Override - protected StorageStrategyITConfig getConfig() - { - return new StorageStrategyITConfig() - { - InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - - InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( - chatRoomId -> 0, - clock, - 8); - - @Override - public ChatHomeService getChatHomeService() - { - return chatHomeService; - } - - @Override - public ChatRoomFactory getChatRoomFactory() - { - return chatRoomFactory; - } - }; - } - @BeforeEach void reset() throws Exception { diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index f86c9d3f..7ca9cb2f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -1,10 +1,6 @@ package de.juplo.kafka.chat.backend.persistence; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import de.juplo.kafka.chat.backend.persistence.InMemoryWithMongoDbStorageIT.DataSourceInitializer; -import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomFactory; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.ChatRoomRepository; import de.juplo.kafka.chat.backend.persistence.storage.mongodb.MongoDbStorageStrategy; @@ -35,7 +31,7 @@ import java.time.Clock; @AutoConfigureDataMongo @ContextConfiguration(initializers = DataSourceInitializer.class) @Slf4j -public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT +public class InMemoryWithMongoDbStorageIT extends AbstractInMemoryStorageIT { @Autowired MongoDbStorageStrategy storageStrategy; @@ -45,39 +41,16 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT Clock clock; - @Override - protected StorageStrategy getStorageStrategy() + public InMemoryWithMongoDbStorageIT() { - return storageStrategy; + super(Clock.systemDefaultZone()); } + @Override - protected StorageStrategyITConfig getConfig() + protected StorageStrategy getStorageStrategy() { - return new StorageStrategyITConfig() - { - InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( - 1, - new int[] { 0 }, - getStorageStrategy().read()); - - InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( - chatRoomId -> 0, - clock, - 8); - - @Override - public ChatHomeService getChatHomeService() - { - return chatHomeService; - } - - @Override - public ChatRoomFactory getChatRoomFactory() - { - return chatRoomFactory; - } - }; + return storageStrategy; } @TestConfiguration -- 2.20.1 From 1d4b90c15b1571bce48389e2c34e7b15c1697b89 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Feb 2023 00:21:50 +0100 Subject: [PATCH 11/16] refactor: Refined the creation of new `ChatRoom`s - Dropped `ChatHomeService.putChatRoom(ChatRoom)`. - `ChatRoomFactory.create(UUID, String)` returns `ChatRoomInfo`. --- .../backend/api/ChatBackendController.java | 13 ++++--- .../chat/backend/api/ChatRoomInfoTo.java | 24 +++++++++++++ .../kafka/chat/backend/api/ChatRoomTo.java | 24 ------------- .../kafka/chat/backend/domain/ChatHome.java | 2 -- .../chat/backend/domain/ChatHomeService.java | 1 - .../kafka/chat/backend/domain/ChatRoom.java | 17 ++------- .../chat/backend/domain/ChatRoomFactory.java | 2 +- .../chat/backend/domain/ChatRoomInfo.java | 22 ++++++++++++ .../chat/backend/domain/ShardedChatHome.java | 6 ---- .../chat/backend/domain/SimpleChatHome.java | 6 ---- .../inmemory/InMemoryChatHomeService.java | 4 +-- .../inmemory/InMemoryChatRoomFactory.java | 12 +++---- .../InMemoryServicesConfiguration.java | 2 ++ .../storage/files/FilesStorageStrategy.java | 36 +++++++++---------- .../AbstractInMemoryStorageIT.java | 1 + .../AbstractStorageStrategyIT.java | 15 ++++---- 16 files changed, 92 insertions(+), 95 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 36bec48a..4db77ee2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -23,21 +23,20 @@ public class ChatBackendController @PostMapping("create") - public Mono create(@RequestBody String name) + public Mono create(@RequestBody String name) { UUID chatRoomId = UUID.randomUUID(); return factory .createChatRoom(chatRoomId, name) - .flatMap(chatRoom -> chatHome.putChatRoom(chatRoom)) - .map(ChatRoomTo::from); + .map(ChatRoomInfoTo::from); } @GetMapping("list") - public Flux list() + public Flux list() { return chatHome .getChatRooms() - .map(chatroom -> ChatRoomTo.from(chatroom)); + .map(chatroom -> ChatRoomInfoTo.from(chatroom)); } @GetMapping("{chatroomId}/list") @@ -51,11 +50,11 @@ public class ChatBackendController } @GetMapping("{chatroomId}") - public Mono get(@PathVariable UUID chatroomId) + public Mono get(@PathVariable UUID chatroomId) { return chatHome .getChatRoom(chatroomId) - .map(chatroom -> ChatRoomTo.from(chatroom)); + .map(chatroom -> ChatRoomInfoTo.from(chatroom)); } @PutMapping("{chatroomId}/{username}/{messageId}") diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java new file mode 100644 index 00000000..212fb8d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomInfoTo.java @@ -0,0 +1,24 @@ +package de.juplo.kafka.chat.backend.api; + +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; +import lombok.Data; + +import java.util.UUID; + +@Data +public class ChatRoomInfoTo +{ + private UUID id; + private String name; + private int shard; + + + public static ChatRoomInfoTo from(ChatRoomInfo info) + { + ChatRoomInfoTo to = new ChatRoomInfoTo(); + to.id = info.getId(); + to.name = info.getName(); + to.shard = info.getShard(); + return to; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java deleted file mode 100644 index e997e4bb..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ /dev/null @@ -1,24 +0,0 @@ -package de.juplo.kafka.chat.backend.api; - -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import lombok.Data; - -import java.util.UUID; - -@Data -public class ChatRoomTo -{ - private UUID id; - private String name; - private int shard; - - - public static ChatRoomTo from(ChatRoom chatroom) - { - ChatRoomTo to = new ChatRoomTo(); - to.id = chatroom.getId(); - to.name = chatroom.getName(); - to.shard = chatroom.getShard(); - return to; - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 99439d36..6091c0c5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -8,8 +8,6 @@ import java.util.UUID; public interface ChatHome { - Mono putChatRoom(ChatRoom chatRoom); - Mono getChatRoom(UUID id); Flux getChatRooms(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 7f13283a..19ff4aa4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,7 +8,6 @@ import java.util.UUID; public interface ChatHomeService { - Mono putChatRoom(ChatRoom chatRoom); Mono getChatRoom(int shard, UUID id); Flux getChatRooms(int shard); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index da5eba2a..cffc0ad0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -1,8 +1,5 @@ package de.juplo.kafka.chat.backend.domain; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -17,17 +14,9 @@ import java.util.regex.Pattern; @Slf4j -@EqualsAndHashCode(of = { "id" }) -@ToString(of = { "id", "name" }) -public class ChatRoom +public class ChatRoom extends ChatRoomInfo { public final static Pattern VALID_USER = Pattern.compile("^[a-z0-9-]{2,}$"); - @Getter - private final UUID id; - @Getter - private final String name; - @Getter - private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -42,10 +31,8 @@ public class ChatRoom ChatRoomService service, int bufferSize) { + super(id, name, shard); log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); - this.id = id; - this.name = name; - this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java index 324e4b02..603795d9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomFactory.java @@ -7,5 +7,5 @@ import java.util.UUID; public interface ChatRoomFactory { - Mono createChatRoom(UUID id, String name); + Mono createChatRoom(UUID id, String name); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java new file mode 100644 index 00000000..6d88be95 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoomInfo.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +import java.util.UUID; + + +@RequiredArgsConstructor +@EqualsAndHashCode(of = { "id" }) +@ToString(of = { "id", "name", "shard" }) +public class ChatRoomInfo +{ + @Getter + private final UUID id; + @Getter + private final String name; + @Getter + private final int shard; +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java index 3023f782..4b8c7f16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java @@ -37,12 +37,6 @@ public class ShardedChatHome implements ChatHome } - @Override - public Mono putChatRoom(ChatRoom chatRoom) - { - return chatHomes[selectShard(chatRoom.getId())].putChatRoom(chatRoom); - } - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java index 46802c69..11542edd 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java @@ -27,12 +27,6 @@ public class SimpleChatHome implements ChatHome } - @Override - public Mono putChatRoom(ChatRoom chatRoom) - { - return service.putChatRoom(chatRoom); - } - @Override public Mono getChatRoom(UUID id) { diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index fd54d34c..8f262a0b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -51,11 +51,9 @@ public class InMemoryChatHomeService implements ChatHomeService .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } - @Override - public Mono putChatRoom(ChatRoom chatRoom) + public void putChatRoom(ChatRoom chatRoom) { chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); - return Mono.just(chatRoom); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 7fff359c..2bde2361 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -1,9 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; -import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; @@ -17,17 +14,20 @@ import java.util.UUID; @Slf4j public class InMemoryChatRoomFactory implements ChatRoomFactory { + private final InMemoryChatHomeService chatHomeService; private final ShardingStrategy shardingStrategy; private final Clock clock; private final int bufferSize; @Override - public Mono createChatRoom(UUID id, String name) + public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + chatHomeService.putChatRoom(chatRoom); + return Mono.just(chatRoom); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 96ef05c2..de504485 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -79,11 +79,13 @@ public class InMemoryServicesConfiguration @Bean InMemoryChatRoomFactory chatRoomFactory( + InMemoryChatHomeService service, ShardingStrategy strategy, Clock clock, ChatBackendProperties properties) { return new InMemoryChatRoomFactory( + service, strategy, clock, properties.getChatroomBufferSize()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 1e3e5eef..f0ee1dfb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -3,7 +3,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.chat.backend.api.ChatRoomTo; +import de.juplo.kafka.chat.backend.api.ChatRoomInfoTo; import de.juplo.kafka.chat.backend.api.MessageTo; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; @@ -82,9 +82,9 @@ public class FilesStorageStrategy implements StorageStrategy { try { - ChatRoomTo chatroomTo = ChatRoomTo.from(chatroom); - generator.writeObject(chatroomTo); - writeMessages(chatroomTo, chatroom.getMessages()); + ChatRoomInfoTo infoTo = ChatRoomInfoTo.from(chatroom); + generator.writeObject(infoTo); + writeMessages(infoTo, chatroom.getMessages()); } catch (IOException e) { @@ -101,28 +101,28 @@ public class FilesStorageStrategy implements StorageStrategy @Override public Flux read() { - JavaType type = mapper.getTypeFactory().constructType(ChatRoomTo.class); + JavaType type = mapper.getTypeFactory().constructType(ChatRoomInfoTo.class); return Flux - .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) + .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> + .map(infoTo -> { - UUID chatRoomId = chatRoomTo.getId(); + UUID chatRoomId = infoTo.getId(); int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), + infoTo.getId(), + infoTo.getName(), shard, clock, - factory.create(readMessages(chatRoomTo)), + factory.create(readMessages(infoTo)), bufferSize); }); } - public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) + public void writeMessages(ChatRoomInfoTo infoTo, Flux messageFlux) { - Path path = chatroomPath(chatroomTo); - log.info("Writing messages for {} to {}", chatroomTo, path); + Path path = chatroomPath(infoTo); + log.info("Writing messages for {} to {}", infoTo, path); try { Files.createDirectories(storagePath); @@ -177,11 +177,11 @@ public class FilesStorageStrategy implements StorageStrategy } } - public Flux readMessages(ChatRoomTo chatroomTo) + public Flux readMessages(ChatRoomInfoTo infoTo) { JavaType type = mapper.getTypeFactory().constructType(MessageTo.class); return Flux - .from(new JsonFilePublisher(chatroomPath(chatroomTo), mapper, type)) + .from(new JsonFilePublisher(chatroomPath(infoTo), mapper, type)) .log() .map(MessageTo::toMessage); } @@ -191,8 +191,8 @@ public class FilesStorageStrategy implements StorageStrategy return storagePath.resolve(Path.of(CHATROOMS_FILENAME)); } - Path chatroomPath(ChatRoomTo chatroomTo) + Path chatroomPath(ChatRoomInfoTo infoTo) { - return storagePath.resolve(Path.of(chatroomTo.getId().toString() + ".json")); + return storagePath.resolve(Path.of(infoTo.getId().toString() + ".json")); } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index 832ebd99..dd76324c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -27,6 +27,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI getStorageStrategy().read()); InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( + chatHomeService, chatRoomId -> 0, clock, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index d5e02b83..3ce527eb 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -40,8 +40,9 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoom chatroom = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); - chathome.putChatRoom(chatroom); + ChatRoomInfo info = chatRoomFactory.createChatRoom(chatRoomId, "FOO").block(); + log.debug("Created chat-room {}", info); + ChatRoom chatroom = chathome.getChatRoom(chatRoomId).block(); Message m1 = chatroom.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message m2 = chatroom.addMessage(1l, "ute", "Ich bin Ute...").block(); Message m3 = chatroom.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); @@ -71,16 +72,18 @@ public abstract class AbstractStorageStrategyIT assertThat(chathome.getChatRooms().toStream()).hasSize(0); UUID chatRoomAId = UUID.fromString("5c73531c-6fc4-426c-adcb-afc5c140a0f7"); - ChatRoom chatroomA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); - chathome.putChatRoom(chatroomA); + ChatRoomInfo infoA = chatRoomFactory.createChatRoom(chatRoomAId, "FOO").block(); + log.debug("Created chat-room {}", infoA); + ChatRoom chatroomA = chathome.getChatRoom(chatRoomAId).block(); Message ma1 = chatroomA.addMessage(1l,"peter", "Hallo, ich heiße Peter!").block(); Message ma2 = chatroomA.addMessage(1l, "ute", "Ich bin Ute...").block(); Message ma3 = chatroomA.addMessage(2l, "peter", "Willst du mit mir gehen?").block(); Message ma4 = chatroomA.addMessage(1l, "klaus", "Ja? Nein? Vielleicht??").block(); UUID chatRoomBId = UUID.fromString("8763dfdc-4dda-4a74-bea4-4b389177abea"); - ChatRoom chatroomB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); - chathome.putChatRoom(chatroomB); + ChatRoomInfo infoB = chatRoomFactory.createChatRoom(chatRoomBId, "BAR").block(); + log.debug("Created chat-room {}", infoB); + ChatRoom chatroomB = chathome.getChatRoom(chatRoomBId).block(); Message mb1 = chatroomB.addMessage(1l,"peter", "Hallo, ich heiße Uwe!").block(); Message mb2 = chatroomB.addMessage(1l, "ute", "Ich bin Ute...").block(); Message mb3 = chatroomB.addMessage(1l, "klaus", "Willst du mit mir gehen?").block(); -- 2.20.1 From 5df111ec5b6442114b90cc2f1ad45ae73a66e69b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Feb 2023 11:07:21 +0100 Subject: [PATCH 12/16] test: Proofed, that there is an NPE in `ShardedChatHome.getChatRoom()` - Refined `AbstractConfigurationIT` to show, that an NPE can occure in `ShardedChatHome.getChatRoom()`. - Defined the expected behaviour, if the NPE is handled correctly in the refined integration-test. --- .../chat/backend/AbstractConfigurationIT.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java index c424b294..8da8fd36 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/AbstractConfigurationIT.java @@ -63,6 +63,20 @@ public abstract class AbstractConfigurationIT .exchange() .expectStatus().isOk() .expectBody().jsonPath("$.text").isEqualTo("Hallo, ich heiße Peter!"); + webTestClient + .put() + .uri("http://localhost:{port}/97cca12a-af72-11ed-9b9e-f78fa16794ac/otto/66", port) + .contentType(MediaType.TEXT_PLAIN) + .accept(MediaType.APPLICATION_JSON) + .bodyValue("The devil rules route 66") + .exchange() + .expectStatus().isNotFound(); + webTestClient + .get() + .uri("http://localhost:{port}/97cca12a-af72-11ed-9b9e-f78fa16794ac/otto/66", port) + .accept(MediaType.APPLICATION_JSON) + .exchange() + .expectStatus().isNotFound(); }); } } -- 2.20.1 From e1ad66ecb1dc386bb357e364a05b071ec45920e1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 24 Feb 2023 09:47:35 +0100 Subject: [PATCH 13/16] refactor: Pushed sharding one layer down in the architecture - The packages `domain` and `api` does not have to know anything about sharding. - This is a preparation for the implementation of a persistence implementation, that is based on Kafka. - Also simplifies the configuration in `InMemoryServiceConfiguration`. --- .../chat/backend/ChatBackendApplication.java | 5 +- .../kafka/chat/backend/domain/ChatHome.java | 23 +++++-- .../chat/backend/domain/ChatHomeService.java | 4 +- .../domain/ShardNotOwnedException.java | 68 +++++++++++++++++++ .../chat/backend/domain/ShardedChatHome.java | 59 ---------------- .../chat/backend/domain/SimpleChatHome.java | 43 ------------ .../domain/UnknownChatroomException.java | 28 +++++++- .../inmemory/InMemoryChatHomeService.java | 52 +++++++++++--- .../InMemoryServicesConfiguration.java | 63 +++++++---------- .../api/ChatBackendControllerTest.java | 14 ++-- ...pleChatHomeTest.java => ChatHomeTest.java} | 11 ++- .../AbstractInMemoryStorageIT.java | 1 + .../AbstractStorageStrategyIT.java | 2 +- 13 files changed, 197 insertions(+), 176 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java delete mode 100644 src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java rename src/test/java/de/juplo/kafka/chat/backend/domain/{SimpleChatHomeTest.java => ChatHomeTest.java} (75%) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index 8e1ff9e5..c61f8488 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @Autowired ChatBackendProperties properties; @Autowired - ChatHome[] chatHomes; + ChatHome chatHome; @Autowired StorageStrategy storageStrategy; @@ -32,8 +32,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - for (int shard = 0; shard < chatHomes.length; shard++) - storageStrategy.write(chatHomes[shard].getChatRooms()); + storageStrategy.write(chatHome.getChatRooms()); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java index 6091c0c5..557cf75f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHome.java @@ -1,14 +1,29 @@ package de.juplo.kafka.chat.backend.domain; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.UUID; +import java.util.*; -public interface ChatHome +@RequiredArgsConstructor +@Slf4j +public class ChatHome { - Mono getChatRoom(UUID id); + private final ChatHomeService service; - Flux getChatRooms(); + + public Mono getChatRoom(UUID id) + { + return service + .getChatRoom(id) + .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); + } + + public Flux getChatRooms() + { + return service.getChatRooms(); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 19ff4aa4..7876130d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -8,6 +8,6 @@ import java.util.UUID; public interface ChatHomeService { - Mono getChatRoom(int shard, UUID id); - Flux getChatRooms(int shard); + Mono getChatRoom(UUID id); + Flux getChatRooms(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java new file mode 100644 index 00000000..d467eabb --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardNotOwnedException.java @@ -0,0 +1,68 @@ +package de.juplo.kafka.chat.backend.domain; + +import lombok.Getter; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.stream.Collectors; + + +public class ShardNotOwnedException extends IllegalStateException +{ + @Getter + private final ChatHomeService chatHomeService; + @Getter + private final ChatRoomInfo chatRoomInfo; + @Getter + private final int shard; + @Getter + private final int[] ownedShards; + + + public ShardNotOwnedException( + ChatHomeService chatHomeService, + ChatRoomInfo chatRoomInfo, + int shard, + Collection ownedShards) + { + this( + chatHomeService, + chatRoomInfo, + shard, + ShardNotOwnedException.toArray(ownedShards)); + } + + public ShardNotOwnedException( + ChatHomeService chatHomeService, + ChatRoomInfo chatRoomInfo, + int shard, + int[] ownedShards) + { + super( + chatHomeService + + " does not own the shard " + + shard + + " for ChatRoom " + + chatRoomInfo + + " owned shards: " + + Arrays + .stream(ownedShards) + .mapToObj(ownedShard -> Integer.toString(ownedShard)) + .collect(Collectors.joining(", "))); + this.chatHomeService = chatHomeService; + this.chatRoomInfo = chatRoomInfo; + this.shard = shard; + this.ownedShards = ownedShards; + } + + + private static int[] toArray(Collection collection) + { + int[] array = new int[collection.size()]; + Iterator iterator = collection.iterator(); + for (int i = 0; iterator.hasNext(); i++) + array[i] = iterator.next(); + return array; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java deleted file mode 100644 index 4b8c7f16..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardedChatHome.java +++ /dev/null @@ -1,59 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; -import java.util.stream.Collectors; - - -@Slf4j -public class ShardedChatHome implements ChatHome -{ - private final ChatHome[] chatHomes; - private final Set ownedShards; - private final ShardingStrategy shardingStrategy; - - - public ShardedChatHome( - ChatHome[] chatHomes, - ShardingStrategy shardingStrategy) - { - this.chatHomes = chatHomes; - this.shardingStrategy = shardingStrategy; - this.ownedShards = new HashSet<>(); - for (int shard = 0; shard < chatHomes.length; shard++) - if(chatHomes[shard] != null) - this.ownedShards.add(shard); - log.info( - "Created ShardedChatHome for shards: {}", - ownedShards - .stream() - .map(String::valueOf) - .collect(Collectors.joining(", "))); - } - - - @Override - public Mono getChatRoom(UUID id) - { - return chatHomes[selectShard(id)].getChatRoom(id); - } - - @Override - public Flux getChatRooms() - { - return Flux - .fromIterable(ownedShards) - .flatMap(shard -> chatHomes[shard].getChatRooms()); - } - - - private int selectShard(UUID chatroomId) - { - return shardingStrategy.selectShard(chatroomId); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java b/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java deleted file mode 100644 index 11542edd..00000000 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/SimpleChatHome.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.chat.backend.domain; - -import lombok.extern.slf4j.Slf4j; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -import java.util.*; - - -@Slf4j -public class SimpleChatHome implements ChatHome -{ - private final ChatHomeService service; - private final int shard; - - - public SimpleChatHome(ChatHomeService service, int shard) - { - log.info("Created SimpleChatHome for shard {}", shard); - this.service = service; - this.shard = shard; - } - - public SimpleChatHome(ChatHomeService service) - { - this(service, 0); - } - - - @Override - public Mono getChatRoom(UUID id) - { - return service - .getChatRoom(shard, id) - .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); - } - - @Override - public Flux getChatRooms() - { - return service.getChatRooms(shard); - } -} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java index 1f70f110..714c2207 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java @@ -2,17 +2,43 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; +import java.util.Arrays; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; -public class UnknownChatroomException extends RuntimeException +public class UnknownChatroomException extends IllegalStateException { @Getter private final UUID chatroomId; + @Getter + private final Optional shard; + @Getter + private final Optional ownedShards; public UnknownChatroomException(UUID chatroomId) { super("Chatroom does not exist: " + chatroomId); this.chatroomId = chatroomId; + this.shard = Optional.empty(); + this.ownedShards = Optional.empty(); + } + + public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards) + { + super( + "Chatroom does not exist (here): " + + chatroomId + + " shard=" + + shard + + ", owned=" + + Arrays + .stream(ownedShards) + .mapToObj(ownedShard -> Integer.toString(ownedShard)) + .collect(Collectors.joining(","))); + this.chatroomId = chatroomId; + this.shard = Optional.of(shard); + this.ownedShards = Optional.of(ownedShards); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 8f262a0b..25a9bcf6 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; -import de.juplo.kafka.chat.backend.domain.ChatRoom; -import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.*; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,31 +12,39 @@ import java.util.*; public class InMemoryChatHomeService implements ChatHomeService { private final Map[] chatrooms; + private final Set ownedShards; + private final ShardingStrategy shardingStrategy; public InMemoryChatHomeService( int numShards, int[] ownedShards, + ShardingStrategy shardingStrategy, Flux chatroomFlux) { log.debug("Creating InMemoryChatHomeService"); + this.chatrooms = new Map[numShards]; - Set owned = Arrays + + this.ownedShards = Arrays .stream(ownedShards) .collect( () -> new HashSet<>(), (set, i) -> set.add(i), (a, b) -> a.addAll(b)); + + this.shardingStrategy = shardingStrategy; + for (int shard = 0; shard < numShards; shard++) { - chatrooms[shard] = owned.contains(shard) + chatrooms[shard] = this.ownedShards.contains(shard) ? new HashMap<>() : null; } chatroomFlux .filter(chatRoom -> { - if (owned.contains(chatRoom.getShard())) + if (this.ownedShards.contains(chatRoom.getShard())) { return true; } @@ -51,20 +58,43 @@ public class InMemoryChatHomeService implements ChatHomeService .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } - public void putChatRoom(ChatRoom chatRoom) + void putChatRoom(ChatRoom chatRoom) { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + UUID id = chatRoom.getId(); + int shard = shardingStrategy.selectShard(id); + if (!ownedShards.contains(shard)) + throw new ShardNotOwnedException(this, chatRoom, shard, ownedShards); + chatrooms[shard].put(id, chatRoom); } @Override - public Mono getChatRoom(int shard, UUID id) + public Mono getChatRoom(UUID id) { - return Mono.justOrEmpty(chatrooms[shard].get(id)); + int shard = shardingStrategy.selectShard(id); + if (ownedShards.contains(shard)) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + else + { + int[] ownedShards = new int[this.ownedShards.size()]; + Iterator iterator = this.ownedShards.iterator(); + for (int i = 0; iterator.hasNext(); i++) + { + ownedShards[i] = iterator.next(); + } + return Mono.error(new UnknownChatroomException( + id, + shard, + ownedShards)); + } } @Override - public Flux getChatRooms(int shard) + public Flux getChatRooms() { - return Flux.fromStream(chatrooms[shard].values().stream()); + return Flux + .fromIterable(ownedShards) + .flatMap(shard -> Flux.fromIterable(chatrooms[shard].values())); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index de504485..cb1a0703 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -2,11 +2,9 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.ChatBackendProperties.ShardingStrategyType; -import de.juplo.kafka.chat.backend.domain.ShardedChatHome; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatHome; -import de.juplo.kafka.chat.backend.domain.SimpleChatHome; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -24,38 +22,9 @@ import java.time.Clock; public class InMemoryServicesConfiguration { @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "none", - matchIfMissing = true) - ChatHome noneShardingChatHome(InMemoryChatHomeService chatHomeService) - { - return new SimpleChatHome(chatHomeService); - } - - @Bean - @ConditionalOnProperty( - prefix = "chat.backend.inmemory", - name = "sharding-strategy", - havingValue = "kafkalike") - ChatHome kafkalikeShardingChatHome( - ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService, - StorageStrategy storageStrategy) + ChatHome chatHome(InMemoryChatHomeService chatHomeService) { - int numShards = properties.getInmemory().getNumShards(); - SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; - storageStrategy - .read() - .subscribe(chatRoom -> - { - int shard = chatRoom.getShard(); - if (chatHomes[shard] == null) - chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); - }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); + return new ChatHome(chatHomeService); } @Bean @@ -65,15 +34,31 @@ public class InMemoryServicesConfiguration { ShardingStrategyType sharding = properties.getInmemory().getShardingStrategy(); - int numShards = sharding == ShardingStrategyType.none - ? 1 - : properties.getInmemory().getNumShards(); - int[] ownedShards = sharding == ShardingStrategyType.none - ? new int[] { 0 } - : properties.getInmemory().getOwnedShards(); + + int numShards; + int[] ownedShards; + ShardingStrategy shardingStrategy; + + switch (sharding) + { + case none: + numShards = 1; + ownedShards = new int[] { 0 }; + shardingStrategy = id -> 0; + break; + case kafkalike: + numShards = properties.getInmemory().getNumShards(); + ownedShards = properties.getInmemory().getOwnedShards(); + shardingStrategy = new KafkaLikeShardingStrategy(numShards); + break; + default: + throw new IllegalArgumentException("Unknown sharding strategy: " + sharding); + } + return new InMemoryChatHomeService( numShards, ownedShards, + shardingStrategy, storageStrategy.read()); } diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index b72294d9..b80e655f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -39,7 +39,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -59,7 +59,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -80,7 +80,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -106,7 +106,7 @@ public class ChatBackendControllerTest UUID chatroomId = UUID.randomUUID(); String username = "foo"; Long messageId = 66l; - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -129,7 +129,7 @@ public class ChatBackendControllerTest { // Given UUID chatroomId = UUID.randomUUID(); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); // When WebTestClient.ResponseSpec responseSpec = client @@ -173,7 +173,7 @@ public class ChatBackendControllerTest 0, Clock.systemDefaultZone(), chatRoomService, 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); Message existingMessage = new Message( key, serialNumberExistingMessage, @@ -225,7 +225,7 @@ public class ChatBackendControllerTest 0, Clock.systemDefaultZone(), chatRoomService, 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) + when(chatHomeService.getChatRoom(any(UUID.class))) .thenReturn(Mono.just(chatRoom)); when(chatRoomService.getMessage(any(Message.MessageKey.class))) .thenReturn(Mono.empty()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java similarity index 75% rename from src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java rename to src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java index 5b53607f..5d9dfafc 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatHomeTest.java @@ -8,13 +8,12 @@ import java.time.Clock; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static pl.rzrz.assertj.reactor.Assertions.assertThat; -public class SimpleChatHomeTest +public class ChatHomeTest { @Test @DisplayName("Assert chatroom is delivered, if it exists") @@ -29,8 +28,8 @@ public class SimpleChatHomeTest Clock.systemDefaultZone(), mock(ChatRoomService.class), 8); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.just(chatRoom)); + ChatHome chatHome = new ChatHome(chatHomeService); // When Mono mono = chatHome.getChatRoom(chatRoom.getId()); @@ -45,8 +44,8 @@ public class SimpleChatHomeTest { // Given ChatHomeService chatHomeService = mock(ChatHomeService.class); - when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.empty()); - SimpleChatHome chatHome = new SimpleChatHome(chatHomeService); + when(chatHomeService.getChatRoom(any(UUID.class))).thenReturn(Mono.empty()); + ChatHome chatHome = new ChatHome(chatHomeService); // When Mono mono = chatHome.getChatRoom(UUID.randomUUID()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java index dd76324c..a68445b6 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractInMemoryStorageIT.java @@ -24,6 +24,7 @@ public abstract class AbstractInMemoryStorageIT extends AbstractStorageStrategyI InMemoryChatHomeService chatHomeService = new InMemoryChatHomeService( 1, new int[] { 0 }, + id -> 0, getStorageStrategy().read()); InMemoryChatRoomFactory chatRoomFactory = new InMemoryChatRoomFactory( diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java index 3ce527eb..83e905b9 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/AbstractStorageStrategyIT.java @@ -23,7 +23,7 @@ public abstract class AbstractStorageStrategyIT protected void start() { StorageStrategyITConfig config = getConfig(); - chathome = new SimpleChatHome(config.getChatHomeService()); + chathome = new ChatHome(config.getChatHomeService()); chatRoomFactory = config.getChatRoomFactory(); } -- 2.20.1 From 9c24aa2aadc3f9ae98362ccb86ab179734362800 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Jan 2023 18:13:40 +0100 Subject: [PATCH 14/16] WIP --- .../kafka/KafkaChatHomeService.java | 182 ++++++++++++++++++ .../kafka/KafkaChatRoomFactory.java | 17 ++ .../kafka/KafkaChatRoomService.java | 60 ++++++ .../backend/persistence/kafka/MessageTo.java | 33 ++++ .../persistence/kafka/MessageToTest.java | 39 ++++ 5 files changed, 331 insertions(+) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java new file mode 100644 index 00000000..4fa567ce --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -0,0 +1,182 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@Slf4j +public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener +{ + private final Consumer consumer; + private final String topic; + private final long[] offsets; + private final MessageHandler[] handlers; + private final Map[] chatrooms; + + + public KafkaChatHomeService( + Consumer consumer, + String topic, + int numShards) + { + log.debug("Creating KafkaChatHomeService"); + this.consumer = consumer; + this.topic = topic; + this.offsets = new long[numShards]; + this.handlers = new MessageHandler[numShards]; + for (int i=0; i< numShards; i++) + { + this.offsets[i] = 0l; + this.handlers[i] = new NoOpMessageHandler(i); + } + this.chatrooms = new Map[numShards]; + } + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + consumer.endOffsets(partitions).forEach((tp, currentOffset) -> + { + if (!tp.topic().equals(topic)) + { + log.warn("Ignoring unwanted TopicPartition", tp); + return; + } + + int partition = tp.partition(); + long unseenOffset = offsets[partition]; + + log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset); + handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + log.info("Revoked partitions: {}", partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info("Revoked partitions: {}", partitions); + } + + private void foo() + { + Set owned = Arrays + .stream(ownedShards) + .collect( + () -> new HashSet<>(), + (set, i) -> set.add(i), + (a, b) -> a.addAll(b)); + for (int shard = 0; shard < numShards; shard++) + { + chatrooms[shard] = owned.contains(shard) + ? new HashMap<>() + : null; + } + chatroomFlux + .filter(chatRoom -> + { + if (owned.contains(chatRoom.getShard())) + { + return true; + } + else + { + log.info("Ignoring not owned chat-room {}", chatRoom); + return false; + } + }) + .toStream() + .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); + } + + @Override + public Mono putChatRoom(ChatRoom chatRoom) + { + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + return Mono.just(chatRoom); + } + + @Override + public Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + @Override + public Flux getChatRooms(int shard) + { + return Flux.fromStream(chatrooms[shard].values().stream()); + } + + + interface MessageHandler + { + MessageHandler handleMessage(Message message); + } + + + @RequiredArgsConstructor + class NoOpMessageHandler implements MessageHandler + { + private final TopicPartition tp; + + @Override + public MessageHandler handleMessage(Message message) + { + log.warn("Not handling message {} for partition {}", message, tp); + return this; + } + } + + class ChatRoomLoadingMessageHandler implements MessageHandler + { + private final TopicPartition tp; + private final long currentOffset; + private final long unseenOffset; + + ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset) + { + this.tp = tp; + this.currentOffset = currentOffset; + this.unseenOffset = unseenOffset; + + consumer.seek(tp, unseenOffset); + } + + @Override + public MessageHandler handleMessage(Message message) + { + // todo + return this; + } + } + + @RequiredArgsConstructor + class DefaultMessageHandler implements MessageHandler + { + private final TopicPartition tp; + + @Override + public MessageHandler handleMessage(Message message) + { + chatrooms[tp.partition()].put() + return this; + } + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java new file mode 100644 index 00000000..20d85e80 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +public class KafkaChatRoomFactory implements ChatRoomFactory +{ + @Override + public Mono createChatRoom(UUID id, String name) + { + return null; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..981c11f5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -0,0 +1,60 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +@RequiredArgsConstructor +public class KafkaChatRoomService implements ChatRoomService +{ + private final Producer producer; + private final TopicPartition tp; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + private long offset = 0l; + + + @Override + public Message persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + + Mono.error(() -> new MessageMutationException(existing, text))); + Message message = new Message(key, (long)messages.size(), timestamp, text); + messages.put(message.getKey(), message); + return message; + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java new file mode 100644 index 00000000..2de8ad58 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java @@ -0,0 +1,33 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MessageTo +{ + private Long id; + private String user; + private String text; + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static MessageTo from(Message message) + { + return + new MessageTo( + message.getId(), + message.getUsername(), + message.getMessageText()); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java new file mode 100644 index 00000000..0c4884bf --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class MessageToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + MessageTo message = mapper.readValue(json, MessageTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +} -- 2.20.1 From 0b207332e5f4a0a7a6e2f19d9c5bb3b261e8c863 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 22:56:20 +0100 Subject: [PATCH 15/16] WIP --- .../kafka/KafkaChatRoomService.java | 57 +++++++++++++++++-- .../backend/persistence/kafka/MessageTo.java | 6 +- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 981c11f5..1175d55f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -6,12 +6,17 @@ import de.juplo.kafka.chat.backend.domain.MessageMutationException; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.LinkedHashMap; +import java.util.UUID; +import java.util.concurrent.Future; @Slf4j @@ -20,6 +25,8 @@ public class KafkaChatRoomService implements ChatRoomService { private final Producer producer; private final TopicPartition tp; + private final UUID chatRoomId; + private final ZoneOffset zoneOffset; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -27,16 +34,54 @@ public class KafkaChatRoomService implements ChatRoomService @Override - public Message persistMessage( + public Mono persistMessage( Message.MessageKey key, LocalDateTime timestamp, String text) { - - Mono.error(() -> new MessageMutationException(existing, text))); - Message message = new Message(key, (long)messages.size(), timestamp, text); - messages.put(message.getKey(), message); - return message; + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + timestamp.toEpochSecond(zoneOffset), + chatRoomId.toString(), + MessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + Message message = messages.get(key); + if (message != null) + { + if (message.getMessageText().equals(text)) + { + // Warn and emit existing message + log.warn( + "Keeping existing message with {}@{} for {}", + message.getSerialNumber(), + message.getTimestamp(), key); + } + else + { + // Emit error and abort + sink.error(new MessageMutationException(message, text)); + return; + } + } + else + { + // Emit new message + message = new Message(key, metadata.offset(), timestamp, text); + messages.put(message.getKey(), message); + } + + sink.success(); + } + })); + }); } @Override diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java index 2de8ad58..0a867f16 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java @@ -10,11 +10,11 @@ import java.time.LocalDateTime; @Data @NoArgsConstructor -@AllArgsConstructor +@AllArgsConstructor(staticName = "of") public class MessageTo { - private Long id; private String user; + private Long id; private String text; public Message toMessage(long offset, LocalDateTime timestamp) @@ -26,8 +26,8 @@ public class MessageTo { return new MessageTo( - message.getId(), message.getUsername(), + message.getId(), message.getMessageText()); } } -- 2.20.1 From d160bd5cb1ac93bdc8e5db40f5d69f722627af5b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 25 Jan 2023 22:58:54 +0100 Subject: [PATCH 16/16] WIP --- .../backend/persistence/kafka/KafkaChatRoomService.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 1175d55f..79f2e63a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -7,7 +7,6 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -16,7 +15,6 @@ import java.time.LocalDateTime; import java.time.ZoneOffset; import java.util.LinkedHashMap; import java.util.UUID; -import java.util.concurrent.Future; @Slf4j @@ -53,6 +51,7 @@ public class KafkaChatRoomService implements ChatRoomService { if (metadata != null) { + // On successful send Message message = messages.get(key); if (message != null) { @@ -80,6 +79,11 @@ public class KafkaChatRoomService implements ChatRoomService sink.success(); } + else + { + // On send-failure + sink.error(exception); + } })); }); } -- 2.20.1