X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Finmemory%2FInMemoryServicesConfiguration.java;h=43973ae0a069906a2d7b49aca75d88d81e0a369a;hb=900422dccb5a92fbceac34caa5e614b0d7f05ad7;hp=263a2d5f0f9b92599ebb280a54af84114c661e0d;hpb=c0b341d3e1ad8eb2ba374d4d21c127b701a726bd;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 263a2d5f..43973ae0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.chat.backend.persistence.inmemory; +package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.ChatBackendProperties; import de.juplo.kafka.chat.backend.domain.ChatHomeService; -import de.juplo.kafka.chat.backend.persistence.ShardingStrategy; -import de.juplo.kafka.chat.backend.persistence.StorageStrategy; +import de.juplo.kafka.chat.backend.implementation.ShardingStrategy; +import de.juplo.kafka.chat.backend.implementation.StorageStrategy; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -26,15 +26,16 @@ public class InMemoryServicesConfiguration name = "sharding-strategy", havingValue = "none", matchIfMissing = true) - ChatHomeService noneShardingChatHome( + SimpleChatHomeService noneShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, Clock clock) { - return new SimpleChatHomeService( - storageStrategy, + SimpleChatHomeService chatHomeService = new SimpleChatHomeService( clock, - properties.getChatroomBufferSize()); + properties.getChatroomHistoryLimit()); + chatHomeService.restore(storageStrategy).block(); + return chatHomeService; } @Bean @@ -42,7 +43,7 @@ public class InMemoryServicesConfiguration prefix = "chat.backend.inmemory", name = "sharding-strategy", havingValue = "kafkalike") - ChatHomeService kafkalikeShardingChatHome( + ShardedChatHomeService kafkalikeShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, Clock clock) @@ -51,13 +52,20 @@ public class InMemoryServicesConfiguration SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards]; IntStream .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( - shard, - storageStrategy, - clock, - properties.getChatroomBufferSize())); + .forEach(shard -> + { + SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService( + shard, + clock, + properties.getChatroomHistoryLimit()); + service.restore(storageStrategy).block(); + }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHomeService(chatHomes, strategy); + return new ShardedChatHomeService( + properties.getInstanceId(), + chatHomes, + properties.getInmemory().getShardOwners(), + strategy); } @ConditionalOnProperty(