From: Kai Moritz Date: Fri, 13 Jan 2023 23:05:31 +0000 (+0100) Subject: feat: Introduced a kafka-like `ShardingStrategy` for `inmemory` X-Git-Tag: wip-sharding~29 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ff98b068a91fc9e60e51bd4a95065633bb8ed2db;p=demos%2Fkafka%2Fchat feat: Introduced a kafka-like `ShardingStrategy` for `inmemory` - Introduced the `ShardingStrategy`, that picks a shard for a given `ChatRoom`-ID. - Implemented a `KafkaLikeShardingStrategy`, that reuses the hashing algorithm, that is implementd in `Utils.murmur2()` in the `org.apache.kafka:kafka-clients` library. - The attribute `shard` of the `ChatHome` has to be restored according to the configured `ShardingStrategy` when loading the state - it must not be safed with the stored data, because it might change due to configuration-changes. - The `ChatBackendController` was not configured correctly, because it had consumed the single `ChatHome` from the old configuration as the only entry in its `ChatHome[]`-array. - Refined the application-properties: Introduced an inner subclass `InMemoryServicesProperties` of `ChatBackendProperties`, that encapsulates the properties, that only concern the implementation `inmemory`. - Added the configuration-parameters `numShards` and `ownedShards`, that are needed by `inmemory`, to handle the sharding correctly. - Introduced `ChatHomeFactory`, because the `ChatHome`s are instanciated by `ChatBackendConfiguration`, which is not aware of the configured implementation. - Adjusted the test-cases to the changes. - Added `InMemoryWithFilesAndShardingConfigurationIT`, that asserts, that the application works as expected, if sharding is activated. --- diff --git a/pom.xml b/pom.xml index e10af5e7..b5b0e81a 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,10 @@ org.springframework.boot spring-boot-starter-data-mongodb + + org.apache.kafka + kafka-clients + org.springframework.boot spring-boot-starter-test diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java index c61f8488..8e1ff9e5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendApplication.java @@ -16,7 +16,7 @@ public class ChatBackendApplication implements WebFluxConfigurer @Autowired ChatBackendProperties properties; @Autowired - ChatHome chatHome; + ChatHome[] chatHomes; @Autowired StorageStrategy storageStrategy; @@ -32,7 +32,8 @@ public class ChatBackendApplication implements WebFluxConfigurer @PreDestroy public void onExit() { - storageStrategy.write(chatHome.getChatRooms()); + for (int shard = 0; shard < chatHomes.length; shard++) + storageStrategy.write(chatHomes[shard].getChatRooms()); } public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java index c196b7ee..6e74bb01 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendConfiguration.java @@ -1,7 +1,9 @@ package de.juplo.kafka.chat.backend; import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatHomeFactory; import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,13 +16,25 @@ import java.time.Clock; public class ChatBackendConfiguration { @Bean - public ChatHome chatHome(ChatHomeService chatHomeService) + ChatHome[] chatHomes( + ChatHomeFactory factory, + ChatBackendProperties properties, + StorageStrategy storageStrategy) { - return new ChatHome(chatHomeService, 0); + ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()]; + storageStrategy + .read() + .subscribe(chatRoom -> + { + int shard = chatRoom.getShard(); + if (chatHomes[shard] == null) + chatHomes[shard] = factory.createChatHome(shard); + }); + return chatHomes; } @Bean - public Clock clock() + Clock clock() { return Clock.systemDefaultZone(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index 48e58165..5cd4535c 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -12,7 +12,24 @@ import java.nio.file.Paths; @Setter public class ChatBackendProperties { - private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); private String allowedOrigins = "http://localhost:4200"; private int chatroomBufferSize = 8; + private ServiceType services = ServiceType.inmemory; + private InMemoryServicesProperties inmemory = new InMemoryServicesProperties(); + + + @Getter + @Setter + public static class InMemoryServicesProperties + { + private ShardingStrategyType shardingStrategy = ShardingStrategyType.kafkalike; + private int numShards = 10; + private int[] ownedShards = { 2 }; + private StorageStrategyType storageStrategy = StorageStrategyType.files; + private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); + } + + public enum ServiceType { inmemory } + public enum StorageStrategyType { files, mongodb } + public enum ShardingStrategyType { none, kafkalike } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java index e997e4bb..3cc59210 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatRoomTo.java @@ -10,7 +10,6 @@ public class ChatRoomTo { private UUID id; private String name; - private int shard; public static ChatRoomTo from(ChatRoom chatroom) @@ -18,7 +17,6 @@ public class ChatRoomTo ChatRoomTo to = new ChatRoomTo(); to.id = chatroom.getId(); to.name = chatroom.getName(); - to.shard = chatroom.getShard(); return to; } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java new file mode 100644 index 00000000..d06ee58a --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/api/KafkaLikeShardingStrategy.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.chat.backend.api; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.utils.Utils; + +import java.util.UUID; + + +@RequiredArgsConstructor +public class KafkaLikeShardingStrategy implements ShardingStrategy +{ + private final int numPartitions; + + @Override + public int selectShard(UUID chatRoomId) + { + byte[] serializedKey = chatRoomId.toString().getBytes(); + return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeFactory.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeFactory.java new file mode 100644 index 00000000..fb20cc7b --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeFactory.java @@ -0,0 +1,6 @@ +package de.juplo.kafka.chat.backend.domain; + +public interface ChatHomeFactory +{ + ChatHome createChatHome(int shard); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeFactory.java new file mode 100644 index 00000000..d2c6b635 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeFactory.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.chat.backend.persistence.inmemory; + +import de.juplo.kafka.chat.backend.domain.ChatHome; +import de.juplo.kafka.chat.backend.domain.ChatHomeFactory; +import lombok.RequiredArgsConstructor; + + +@RequiredArgsConstructor +public class InMemoryChatHomeFactory implements ChatHomeFactory +{ + private final InMemoryChatHomeService service; + + + @Override + public ChatHome createChatHome(int shard) + { + return new ChatHome(service, shard); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java index acfd936b..87fa61fa 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryChatHomeService.java @@ -6,9 +6,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; @Slf4j @@ -17,13 +15,38 @@ public class InMemoryChatHomeService implements ChatHomeService[] chatrooms; - public InMemoryChatHomeService(int numShards, Flux chatroomFlux) + public InMemoryChatHomeService( + int numShards, + int[] ownedShards, + Flux chatroomFlux) { log.debug("Creating InMemoryChatHomeService"); this.chatrooms = new Map[numShards]; + Set owned = Arrays + .stream(ownedShards) + .collect( + () -> new HashSet<>(), + (set, i) -> set.add(i), + (a, b) -> a.addAll(b)); for (int shard = 0; shard < numShards; shard++) - chatrooms[shard] = new HashMap<>(); + { + chatrooms[shard] = owned.contains(shard) + ? new HashMap<>() + : null; + } chatroomFlux + .filter(chatRoom -> + { + if (owned.contains(chatRoom.getShard())) + { + return true; + } + else + { + log.info("Ignoring not owned chat-room {}", chatRoom); + return false; + } + }) .toStream() .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java index e21ead5a..94dd5d8a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/inmemory/InMemoryServicesConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.api.KafkaLikeShardingStrategy; import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -13,15 +14,26 @@ import java.time.Clock; @ConditionalOnProperty( prefix = "chat.backend", name = "services", - havingValue = "in-memory", + havingValue = "inmemory", matchIfMissing = true) @Configuration public class InMemoryServicesConfiguration { @Bean - InMemoryChatHomeService chatHomeService(StorageStrategy storageStrategy) + InMemoryChatHomeService chatHomeService( + ChatBackendProperties properties, + StorageStrategy storageStrategy) { - return new InMemoryChatHomeService(1, storageStrategy.read()); + return new InMemoryChatHomeService( + properties.getInmemory().getNumShards(), + properties.getInmemory().getOwnedShards(), + storageStrategy.read()); + } + + @Bean + InMemoryChatHomeFactory chatHomeFactory(InMemoryChatHomeService service) + { + return new InMemoryChatHomeFactory(service); } @Bean @@ -36,9 +48,25 @@ public class InMemoryServicesConfiguration properties.getChatroomBufferSize()); } + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "none", + matchIfMissing = true) @Bean - ShardingStrategy shardingStrategy() + ShardingStrategy defaultShardingStrategy() { return chatRoomId -> 0; } + + @ConditionalOnProperty( + prefix = "chat.backend.inmemory", + name = "sharding-strategy", + havingValue = "kafkalike") + @Bean + ShardingStrategy kafkalikeShardingStrategy(ChatBackendProperties properties) + { + return new KafkaLikeShardingStrategy( + properties.getInmemory().getNumShards()); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java index 31e285ca..c7e56436 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.files; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -16,8 +17,8 @@ import java.time.Clock; @ConditionalOnProperty( - prefix = "chat.backend", - name = "storage", + prefix = "chat.backend.inmemory", + name = "storage-strategy", havingValue = "files", matchIfMissing = true) @Configuration @@ -31,12 +32,14 @@ public class FilesStorageConfiguration public StorageStrategy storageStrategy( ChatBackendProperties properties, Clock clock, + ShardingStrategy shardingStrategy, ObjectMapper mapper) { return new FilesStorageStrategy( - Paths.get(properties.getStorageDirectory()), + Paths.get(properties.getInmemory().getStorageDirectory()), clock, properties.getChatroomBufferSize(), + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java index d043696c..24c6a01e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/files/FilesStorageStrategy.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.chat.backend.api.ChatRoomTo; import de.juplo.kafka.chat.backend.api.MessageTo; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; @@ -16,6 +17,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.Clock; +import java.util.UUID; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; @@ -31,6 +33,7 @@ public class FilesStorageStrategy implements StorageStrategy private final Path storagePath; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; private final ObjectMapper mapper; @@ -102,13 +105,18 @@ public class FilesStorageStrategy implements StorageStrategy return Flux .from(new JsonFilePublisher(chatroomsPath(), mapper, type)) .log() - .map(chatRoomTo -> new ChatRoom( - chatRoomTo.getId(), - chatRoomTo.getName(), - chatRoomTo.getShard(), - clock, - factory.create(readMessages(chatRoomTo)), - bufferSize)); + .map(chatRoomTo -> + { + UUID chatRoomId = chatRoomTo.getId(); + int shard = shardingStrategy.selectShard(chatRoomId); + return new ChatRoom( + chatRoomTo.getId(), + chatRoomTo.getName(), + shard, + clock, + factory.create(readMessages(chatRoomTo)), + bufferSize); + }); } public void writeMessages(ChatRoomTo chatroomTo, Flux messageFlux) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java index 27e65e41..1ad8d178 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/ChatRoomTo.java @@ -20,7 +20,6 @@ public class ChatRoomTo @Id private String id; private String name; - private int shard; private List messages; public static ChatRoomTo from(ChatRoom chatroom) @@ -28,7 +27,6 @@ public class ChatRoomTo return new ChatRoomTo( chatroom.getId().toString(), chatroom.getName(), - chatroom.getShard(), chatroom .getMessages() .map(MessageTo::from) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java index 0a3df755..862dac07 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; import de.juplo.kafka.chat.backend.ChatBackendProperties; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatRoomService; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -11,8 +12,8 @@ import java.time.Clock; @ConditionalOnProperty( - prefix = "chat.backend", - name = "storage", + prefix = "chat.backend.inmemory", + name = "storage-strategy", havingValue = "mongodb") @Configuration public class MongoDbStorageConfiguration @@ -21,12 +22,14 @@ public class MongoDbStorageConfiguration public StorageStrategy storageStrategy( ChatRoomRepository chatRoomRepository, ChatBackendProperties properties, - Clock clock) + Clock clock, + ShardingStrategy shardingStrategy) { return new MongoDbStorageStrategy( chatRoomRepository, clock, properties.getChatroomBufferSize(), + shardingStrategy, messageFlux -> new InMemoryChatRoomService(messageFlux)); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java index 8429fe83..ec90e945 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/storage/mongodb/MongoDbStorageStrategy.java @@ -1,5 +1,6 @@ package de.juplo.kafka.chat.backend.persistence.storage.mongodb; +import de.juplo.kafka.chat.backend.api.ShardingStrategy; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.persistence.StorageStrategy; import de.juplo.kafka.chat.backend.persistence.storage.files.ChatRoomServiceFactory; @@ -18,6 +19,7 @@ public class MongoDbStorageStrategy implements StorageStrategy private final ChatRoomRepository repository; private final Clock clock; private final int bufferSize; + private final ShardingStrategy shardingStrategy; private final ChatRoomServiceFactory factory; @@ -34,15 +36,20 @@ public class MongoDbStorageStrategy implements StorageStrategy { return Flux .fromIterable(repository.findAll()) - .map(chatRoomTo -> new ChatRoom( - UUID.fromString(chatRoomTo.getId()), - chatRoomTo.getName(), - chatRoomTo.getShard(), - clock, - factory.create( - Flux - .fromIterable(chatRoomTo.getMessages()) - .map(messageTo -> messageTo.toMessage())), - bufferSize)); + .map(chatRoomTo -> + { + UUID chatRoomId = UUID.fromString(chatRoomTo.getId()); + int shard = shardingStrategy.selectShard(chatRoomId); + return new ChatRoom( + chatRoomId, + chatRoomTo.getName(), + shard, + clock, + factory.create( + Flux + .fromIterable(chatRoomTo.getMessages()) + .map(messageTo -> messageTo.toMessage())), + bufferSize); + }); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 8b137891..e69de29b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1 +0,0 @@ - diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java new file mode 100644 index 00000000..5138deed --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesAndShardingConfigurationIT.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.chat.backend; + +import org.springframework.boot.test.context.SpringBootTest; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "chat.backend.inmemory.storage-directory=target/test-classes/data/files", + "chat.backend.inmemory.sharding-strategy=kafkalike" }) +class InMemoryWithFilesAndShardingConfigurationIT extends AbstractConfigurationIT +{ +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java index 0065219f..9027c6f4 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithFilesConfigurationIT.java @@ -5,7 +5,11 @@ import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, - properties = "chat.backend.storage-directory=target/test-classes/data/files") + properties = { + "chat.backend.inmemory.sharding-strategy=none", + "chat.backend.inmemory.num-shards=1", + "chat.backend.inmemory.owned-shards=0", + "chat.backend.inmemory.storage-directory=target/test-classes/data/files" }) class InMemoryWithFilesConfigurationIT extends AbstractConfigurationIT { } diff --git a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java index abb0956f..05f1de5f 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/InMemoryWithMongoDbConfigurationIT.java @@ -15,9 +15,10 @@ import org.testcontainers.junit.jupiter.Testcontainers; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "chat.backend.storage=mongodb", "spring.data.mongodb.host=localhost", - "spring.data.mongodb.database=test" }) + "spring.data.mongodb.database=test", + "chat.backend.inmemory.sharding-strategy=kafkalike", + "chat.backend.inmemory.storage-strategy=mongodb" }) @Testcontainers @Slf4j class InMemoryWithMongoDbConfigurationIT extends AbstractConfigurationIT diff --git a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java index b3504d2e..12d34be1 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/api/ChatBackendControllerTest.java @@ -1,32 +1,39 @@ package de.juplo.kafka.chat.backend.api; +import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.*; +import de.juplo.kafka.chat.backend.persistence.inmemory.InMemoryChatHomeService; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; import org.springframework.test.web.reactive.server.WebTestClient; import reactor.core.publisher.Mono; import java.time.Clock; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.UUID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; -@SpringBootTest(properties = "spring.main.allow-bean-definition-overriding=true") +@SpringBootTest(properties = { + "spring.main.allow-bean-definition-overriding=true", + "chat.backend.inmemory.owned-shards=0,1,2,3,4,5,6,7,8,9" }) @AutoConfigureWebTestClient @Slf4j public class ChatBackendControllerTest { @MockBean - ChatHomeService chatHomeService; + InMemoryChatHomeService chatHomeService; @MockBean ChatRoomService chatRoomService; @@ -248,4 +255,20 @@ public class ChatBackendControllerTest .jsonPath("$.username").isEqualTo(user); verify(chatRoomService, never()).persistMessage(eq(key), any(LocalDateTime.class), any(String.class)); } + + @TestConfiguration + static class Config + { + @Bean + ChatHome[] chatHomes( + ChatBackendProperties properties, + ChatHomeFactory factory) + { + ChatHome[] chatHomes = new ChatHome[properties.getInmemory().getNumShards()]; + Arrays + .stream(properties.getInmemory().getOwnedShards()) + .forEach(i -> chatHomes[i] = factory.createChatHome(i)); + return chatHomes; + } + } } diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java index 2c72ea0f..8972b04e 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithFilesStorageIT.java @@ -41,6 +41,7 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT path, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux), mapper); } @@ -55,7 +56,10 @@ public class InMemoryWithFilesStorageIT extends AbstractStorageStrategyIT @Override protected Supplier getChatHomeServiceSupplier() { - return () -> new InMemoryChatHomeService(1, getStorageStrategy().read()); + return () -> new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); } @Override diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java index 80067117..34e21268 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/InMemoryWithMongoDbStorageIT.java @@ -54,7 +54,10 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT @Override protected Supplier getChatHomeServiceSupplier() { - return () -> new InMemoryChatHomeService(1, getStorageStrategy().read()); + return () -> new InMemoryChatHomeService( + 1, + new int[] { 0 }, + getStorageStrategy().read()); } @Override @@ -76,6 +79,7 @@ public class InMemoryWithMongoDbStorageIT extends AbstractStorageStrategyIT chatRoomRepository, clock, 8, + chatRoomId -> 0, messageFlux -> new InMemoryChatRoomService(messageFlux)); } diff --git a/src/test/resources/data/files/chatrooms.json b/src/test/resources/data/files/chatrooms.json index d0f1b92c..a2ce9418 100644 --- a/src/test/resources/data/files/chatrooms.json +++ b/src/test/resources/data/files/chatrooms.json @@ -1,5 +1,4 @@ [ { "id" : "5c73531c-6fc4-426c-adcb-afc5c140a0f7", - "name" : "FOO", - "shard" : 0 -} ] \ No newline at end of file + "name" : "FOO" +} ] diff --git a/src/test/resources/data/mongodb/chatRoomTo.json b/src/test/resources/data/mongodb/chatRoomTo.json index 5d1bee05..ae150343 100644 --- a/src/test/resources/data/mongodb/chatRoomTo.json +++ b/src/test/resources/data/mongodb/chatRoomTo.json @@ -1,7 +1,6 @@ { "_id": "5c73531c-6fc4-426c-adcb-afc5c140a0f7", "name": "FOO", - "shard": 0, "messages": [ { "_id": "peter--1",