From: Kai Moritz Date: Sat, 14 Jan 2023 16:33:02 +0000 (+0100) Subject: refactor: A `ChatRoom` does not have to remember its shard any more X-Git-Tag: wip-sharding~25 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=7dc64266c5675ead8214edb36173b80363e08b1f;p=demos%2Fkafka%2Fchat refactor: A `ChatRoom` does not have to remember its shard any more - The shard of a `ChatRoom` can be derived from its ID, if the configured `ShardingStrategy` is known. - Because the `ShardingStrategy` is known everywhere, where the shard of a `ChatRoom` is needed, it always can be derived dynamically. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java index 44965852..1c21fb97 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatRoom.java @@ -25,8 +25,6 @@ public class ChatRoom private final UUID id; @Getter private final String name; - @Getter - private final int shard; private final Clock clock; private final ChatRoomService service; private final int bufferSize; @@ -36,14 +34,12 @@ public class ChatRoom public ChatRoom( UUID id, String name, - int shard, Clock clock, ChatRoomService service, int bufferSize) { this.id = id; this.name = name; - this.shard = shard; this.clock = clock; this.service = service; this.bufferSize = bufferSize; diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index 87fa61fa..0beb7f36 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -12,15 +13,18 @@ import java.util.*; @Slf4j public class InMemoryChatHomeService implements ChatHomeService { + private final ShardingStrategy shardingStrategy; private final Map[] chatrooms; public InMemoryChatHomeService( + ShardingStrategy shardingStrategy, int numShards, int[] ownedShards, Flux chatroomFlux) { log.debug("Creating InMemoryChatHomeService"); + this.shardingStrategy = shardingStrategy; this.chatrooms = new Map[numShards]; Set owned = Arrays .stream(ownedShards) @@ -37,7 +41,8 @@ public class InMemoryChatHomeService implements ChatHomeService { - if (owned.contains(chatRoom.getShard())) + int shard = shardingStrategy.selectShard(chatRoom.getId()); + if (owned.contains(shard)) { return true; } @@ -48,13 +53,16 @@ public class InMemoryChatHomeService implements ChatHomeService chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); + .forEach(chatRoom -> + { + getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); + }); } @Override public Mono putChatRoom(ChatRoom chatRoom) { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + getChatRoomMapFor(chatRoom).put(chatRoom.getId(), chatRoom); return Mono.just(chatRoom); } @@ -69,4 +77,11 @@ public class InMemoryChatHomeService implements ChatHomeService getChatRoomMapFor(ChatRoom chatRoom) + { + int shard = shardingStrategy.selectShard(chatRoom.getId()); + return chatrooms[shard]; + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java index 7fff359c..3e1d3603 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatRoomFactory.java @@ -26,8 +26,7 @@ public class InMemoryChatRoomFactory implements ChatRoomFactory public Mono createChatRoom(UUID id, String name) { log.info("Creating ChatRoom with buffer-size {}", bufferSize); - int shard = shardingStrategy.selectShard(id); ChatRoomService service = new InMemoryChatRoomService(Flux.empty()); - return Mono.just(new ChatRoom(id, name, shard, clock, service, bufferSize)); + return Mono.just(new ChatRoom(id, name, clock, service, bufferSize)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index 96ef05c2..1dca0402 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -45,22 +45,23 @@ public class InMemoryServicesConfiguration StorageStrategy storageStrategy) { int numShards = properties.getInmemory().getNumShards(); + ShardingStrategy shardingStrategy = new KafkaLikeShardingStrategy(numShards); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; storageStrategy .read() .subscribe(chatRoom -> { - int shard = chatRoom.getShard(); + int shard = shardingStrategy.selectShard(chatRoom.getId()); if (chatHomes[shard] == null) chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); }); - ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHome(chatHomes, strategy); + return new ShardedChatHome(chatHomes, shardingStrategy); } @Bean InMemoryChatHomeService chatHomeService( ChatBackendProperties properties, + ShardingStrategy shardingStrategy, StorageStrategy storageStrategy) { ShardingStrategyType sharding = @@ -72,6 +73,7 @@ public class InMemoryServicesConfiguration ? new int[] { 0 } : properties.getInmemory().getOwnedShards(); return new InMemoryChatHomeService( + shardingStrategy, numShards, ownedShards, storageStrategy.read()); diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index df730aaf..0da1b953 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -32,14 +31,13 @@ public class FilesStorageConfiguration public StorageStrategy storageStrategy( ChatBackendProperties properties, Clock clock, - ShardingStrategy shardingStrategy, ObjectMapper mapper) { return new FilesStorageStrategy( Paths.get(properties.getInmemory().getStorageDirectory()), clock, properties.getChatroomBufferSize(), - shardingStrategy, + messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index 1e3e5eef..5d3c067a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -5,7 +5,6 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; @@ -17,7 +16,6 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; -import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -33,7 +31,6 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; - private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -105,18 +102,12 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> - { - UUID chatRoomId = chatRoomTo.getId(); - int shard = shardingStrategy.selectShard(chatRoomId); - return new ChatRoom( + .map(chatRoomTo -> new ChatRoom( chatRoomTo.getId(), chatRoomTo.getName(), - shard, clock, factory.create(readMessages(chatRoomTo)), - bufferSize); - }); + bufferSize)); } public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 2b33eed7..22d7c26b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,7 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.ChatBackendProperties; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -22,14 +21,12 @@ public class MongoDbStorageConfiguration public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, ChatBackendProperties properties, - Clock clock, - ShardingStrategy shardingStrategy) + Clock clock) { return new MongoDbStorageStrategy( chatRoomRepository, clock, properties.getChatroomBufferSize(), - shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index d21fe2ba..23002198 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,6 +1,5 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; -import de.juplo.kafka.chat.backend.domain.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -19,7 +18,6 @@ public class MongoDbStorageStrategy implements StorageStrategy private final ChatRoomRepository repository; private final Clock clock; private final int bufferSize; - private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; @@ -39,11 +37,9 @@ public class MongoDbStorageStrategy implements StorageStrategy .map(chatRoomTo -> { UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); - int shard = shardingStrategy.selectShard(chatRoomId); return new ChatRoom( chatRoomId, chatRoomTo.getName(), - shard, clock, factory.create( Flux diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index 1b25a11e..a2870284 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -170,7 +170,6 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", - 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))).thenReturn(Mono.just(chatRoom)); @@ -222,7 +221,6 @@ public class ChatBackendControllerTest ChatRoom chatRoom = new ChatRoom( chatroomId, "Test-ChatRoom", - 0, Clock.systemDefaultZone(), chatRoomService, 8); when(chatHomeService.getChatRoom(anyInt(), any(UUID.class))) diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java index 9c418f17..d87155c2 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/ChatRoomTest.java @@ -27,7 +27,6 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -54,7 +53,6 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -78,7 +76,6 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -108,7 +105,6 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); @@ -138,7 +134,6 @@ public class ChatRoomTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", - 0, Clock.systemDefaultZone(), chatRoomService, 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java index 5b53607f..c9d1b633 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/domain/SimpleChatHomeTest.java @@ -25,7 +25,6 @@ public class SimpleChatHomeTest ChatRoom chatRoom = new ChatRoom( UUID.randomUUID(), "Foo", - 0, Clock.systemDefaultZone(), mock(ChatRoomService.class), 8); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index fe7ecac1..d6909df4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -41,7 +41,6 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT path, clock, 8, - chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } @@ -57,6 +56,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( + chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index 9808aa31..ecc64d5a 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -55,6 +55,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT protected Supplier getChatHomeServiceSupplier() { return () -> new InMemoryChatHomeService( + chatRoomId -> 0, 1, new int[] { 0 }, getStorageStrategy().read()); @@ -79,7 +80,6 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT chatRoomRepository, clock, 8, - chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux)); }