From: Kai Moritz Date: Sat, 14 Jan 2023 18:53:16 +0000 (+0100) Subject: feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint X-Git-Tag: wip~23 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a1886d1dc7375620c51d2f4863c9aa7cb056c75a;p=demos%2Fkafka%2Fchat feat: Reintroduced `ChatRoom.shard`, becaus it is needed as a routing-hint - Technically, the attribute is not needed. - But, to implement the gua/sha-pattern, it is needed as routing-hing. - Hence, it is reintroduced here and also added to the `ChatRoomTo` to be send to the client. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java index 3cc59210..e997e4bb 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java @@ -10,6 +10,7 @@ public class ChatRoomTo { private UUID id; private String name; + private int shard; public static ChatRoomTo from(ChatRoom chatroom) @@ -17,6 +18,7 @@ public class ChatRoomTo ChatRoomTo to = new ChatRoomTo(); to.id = chatroom.getId(); to.name = chatroom.getName(); + to.shard = chatroom.getShard(); return to; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index cc5c5a07..4f855b8c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -25,6 +25,8 @@ public class ChatRoom private final UUID id; @Getter private final String name; + @Getter + private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -34,6 +36,7 @@ public class ChatRoom public ChatRoom( UUID id, String name, + int shard, Clock clock, ChatRoomService service, int bufferSize) @@ -41,6 +44,7 @@ public class ChatRoom log.info("Created ChatRoom {} with buffer-size {}", id, bufferSize); this.id = id; this.name = name; + this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index a9e58942..0f39fba3 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -13,18 +12,15 @@ import java.util.*; @Slf4j public class InMemoryChatHomeService implements ChatHomeService { - private final ShardingStrategy shardingStrategy; private final Map[] chatrooms; public InMemoryChatHomeService( - ShardingStrategy shardingStrategy, int numShards, int[] ownedShards, Flux chatroomFlux) { log.debug("Creating ChatHomeService"); - this.shardingStrategy = shardingStrategy; this.chatrooms = new Map[numShards]; Set owned = Arrays .stream(ownedShards) @@ -41,8 +37,7 @@ public class InMemoryChatHomeService implements ChatHomeService chatroomFlux .filter(chatRoom -> { - int shard = shardingStrategy.selectShard(chatRoom.getId()); - if (owned.contains(shard)) + if (owned.contains(chatRoom.getShard())) { return true; } @@ -53,16 +48,13 @@ public class InMemoryChatHomeService implements ChatHomeService } }) .toStream() - .forEach(chatRoom -> - { - getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); - }); + .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } @Override public Mono putChatRoom(ChatRoom chatRoom) { - getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); return Mono.just(chatRoom); } @@ -77,11 +69,4 @@ public class InMemoryChatHomeService implements ChatHomeService { return Flux.fromStream(chatrooms[shard].values().stream()); } - - - private Map getChatRoomMapFor(ChatRoom chatRoom) - { - int shard = shardingStrategy.selectShard(chatRoom.getId()); - return chatrooms[shard]; - } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 3e1d3603..7fff359c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -26,7 +26,8 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); + int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, clock, service, bufferSize)); + return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 1dca0402..96ef05c2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -45,23 +45,22 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy) { int numShards = properties.getInmemory().getNumShards(); - ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; storageStrategy .read() .subscribe(chatRoom -> { - int shard = shardingStrategy.selectShard(chatRoom.getId()); + int shard = chatRoom.getShard(); if (chatHomes[shard] == null) chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); }); - return new ShardedChatHome(chatHomes, shardingStrategy); + ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); + return new ShardedChatHome(chatHomes, strategy); } @Bean InMemoryChatHomeService chatHomeService( ChatBackendProperties properties, - ShardingStrategy shardingStrategy, StorageStrategy storageStrategy) { ShardingStrategyType sharding = @@ -73,7 +72,6 @@ public class InMemoryServicesConfiguration ? new int[] { 0 } : properties.getInmemory().getOwnedShards(); return new InMemoryChatHomeService( - shardingStrategy, numShards, ownedShards, storageStrategy.read()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index 0da1b953..df730aaf 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -31,13 +32,14 @@ public class FilesStorageConfiguration public StorageStrategy storageStrategy( ChatBackendProperties properties, Clock clock, + ShardingStrategy shardingStrategy, ObjectMapper mapper) { return new FilesStorageStrategy( Paths.get(properties.getInmemory().getStorageDirectory()), clock, properties.getChatroomBufferSize(), - + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 5d3c067a..1e3e5eef 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; @@ -16,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -31,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -102,12 +105,18 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> new ChatRoom( + .map(chatRoomTo -> + { + UUID chatRoomId = chatRoomTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + return new ChatRoom( chatRoomTo.getId(), chatRoomTo.getName(), + shard, clock, factory.create(readMessages(chatRoomTo)), - bufferSize)); + bufferSize); + }); } public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 22d7c26b..2b33eed7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -21,12 +22,14 @@ public class MongoDbStorageConfiguration public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, ChatBackendProperties properties, - Clock clock) + Clock clock, + ShardingStrategy shardingStrategy) { return new MongoDbStorageStrategy( chatRoomRepository, clock, properties.getChatroomBufferSize(), + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 23002198..d21fe2ba 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy private final ChatRoomRepository repository; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; @@ -37,9 +39,11 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); + int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( chatRoomId, chatRoomTo.getName(), + shard, clock, factory.create( Flux diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index a2870284..1b25a11e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -170,6 +170,7 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); @@ -221,6 +222,7 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", + 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index d87155c2..9c418f17 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -27,6 +27,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -53,6 +54,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -76,6 +78,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -105,6 +108,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -134,6 +138,7 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), chatRoomService, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java index c9d1b633..5b53607f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java @@ -25,6 +25,7 @@ public class SimpleChatHomeTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", + 0, Clock.systemDefaultZone(), mock(ChatRoomService.class), 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index d6909df4..fe7ecac1 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -41,6 +41,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT path, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } @@ -56,7 +57,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( - chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index ecc64d5a..9808aa31 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -55,7 +55,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( - chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); @@ -80,6 +79,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT chatRoomRepository, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux)); }