From: Kai Moritz Date: Wed, 19 Apr 2023 19:24:17 +0000 (+0200) Subject: NEU X-Git-Tag: kafkadata~13 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=df2360875c75f25798f2f55d1e77fc8d46cc31b0;hp=d3e648da9f3e10601d94c80740715a20eda851df;p=demos%2Fkafka%2Fchat NEU --- diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java index fd42d9df..91115ea5 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaServicesConfiguration.java @@ -21,19 +21,28 @@ import java.time.Clock; @ConditionalOnProperty( prefix = "chat.backend", name = "services", - havingValue = "inmemory", - matchIfMissing = true) + havingValue = "kafka") @Configuration public class KafkaServicesConfiguration implements ApplicationRunner { @Bean ChatHome kafkaChatHome( ChatBackendProperties properties, - InMemoryChatHomeService chatHomeService, - StorageStrategy storageStrategy) + KafkaChatHomeService chatHomeService) { int numShards = properties.getInmemory().getNumShards(); SimpleChatHome[] chatHomes = new SimpleChatHome[numShards]; + for (int shard = 0; shard < numShards; shard++) + { + + } + .read() + .subscribe(chatRoom -> + { + int shard = chatRoom.getShard(); + if (chatHomes[shard] == null) + chatHomes[shard] = new SimpleChatHome(chatHomeService, shard); + }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHome(chatHomes, strategy); }