X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fimplementation%2Finmemory%2FInMemoryServicesConfiguration.java;h=43973ae0a069906a2d7b49aca75d88d81e0a369a;hb=900422dccb5a92fbceac34caa5e614b0d7f05ad7;hp=5b5785ea54bb2bc79c730cf357aaa5c3fbc886f6;hpb=ec456b2c00027e54a49f3d916e89c831b2589186;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 5b5785ea..43973ae0 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -26,15 +26,16 @@ public class InMemoryServicesConfiguration name = "sharding-strategy", havingValue = "none", matchIfMissing = true) - ChatHomeService noneShardingChatHome( + SimpleChatHomeService noneShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, Clock clock) { - return new SimpleChatHomeService( - storageStrategy, + SimpleChatHomeService chatHomeService = new SimpleChatHomeService( clock, - properties.getChatroomBufferSize()); + properties.getChatroomHistoryLimit()); + chatHomeService.restore(storageStrategy).block(); + return chatHomeService; } @Bean @@ -42,7 +43,7 @@ public class InMemoryServicesConfiguration prefix = "chat.backend.inmemory", name = "sharding-strategy", havingValue = "kafkalike") - ChatHomeService kafkalikeShardingChatHome( + ShardedChatHomeService kafkalikeShardingChatHome( ChatBackendProperties properties, StorageStrategy storageStrategy, Clock clock) @@ -51,11 +52,14 @@ public class InMemoryServicesConfiguration SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards]; IntStream .of(properties.getInmemory().getOwnedShards()) - .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService( - shard, - storageStrategy, - clock, - properties.getChatroomBufferSize())); + .forEach(shard -> + { + SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService( + shard, + clock, + properties.getChatroomHistoryLimit()); + service.restore(storageStrategy).block(); + }); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); return new ShardedChatHomeService( properties.getInstanceId(),