projects
/
demos
/
kafka
/
chat
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
feat: Added counting of restored instances
[demos/kafka/chat]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
chat
/
backend
/
implementation
/
inmemory
/
InMemoryServicesConfiguration.java
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
index
263a2d5
..
3f3d888
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
@@
-1,9
+1,9
@@
-package de.juplo.kafka.chat.backend.
persistence
.inmemory;
+package de.juplo.kafka.chat.backend.
implementation
.inmemory;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
import de.juplo.kafka.chat.backend.ChatBackendProperties;
import de.juplo.kafka.chat.backend.domain.ChatHomeService;
-import de.juplo.kafka.chat.backend.
persistence
.ShardingStrategy;
-import de.juplo.kafka.chat.backend.
persistence
.StorageStrategy;
+import de.juplo.kafka.chat.backend.
implementation
.ShardingStrategy;
+import de.juplo.kafka.chat.backend.
implementation
.StorageStrategy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@
-26,15
+26,16
@@
public class InMemoryServicesConfiguration
name = "sharding-strategy",
havingValue = "none",
matchIfMissing = true)
name = "sharding-strategy",
havingValue = "none",
matchIfMissing = true)
- ChatHomeService noneShardingChatHome(
+
Simple
ChatHomeService noneShardingChatHome(
ChatBackendProperties properties,
StorageStrategy storageStrategy,
Clock clock)
{
ChatBackendProperties properties,
StorageStrategy storageStrategy,
Clock clock)
{
- return new SimpleChatHomeService(
- storageStrategy,
+ SimpleChatHomeService chatHomeService = new SimpleChatHomeService(
clock,
properties.getChatroomBufferSize());
clock,
properties.getChatroomBufferSize());
+ chatHomeService.restore(storageStrategy).block();
+ return chatHomeService;
}
@Bean
}
@Bean
@@
-42,7
+43,7
@@
public class InMemoryServicesConfiguration
prefix = "chat.backend.inmemory",
name = "sharding-strategy",
havingValue = "kafkalike")
prefix = "chat.backend.inmemory",
name = "sharding-strategy",
havingValue = "kafkalike")
- ChatHomeService kafkalikeShardingChatHome(
+
Sharded
ChatHomeService kafkalikeShardingChatHome(
ChatBackendProperties properties,
StorageStrategy storageStrategy,
Clock clock)
ChatBackendProperties properties,
StorageStrategy storageStrategy,
Clock clock)
@@
-51,13
+52,20
@@
public class InMemoryServicesConfiguration
SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
IntStream
.of(properties.getInmemory().getOwnedShards())
SimpleChatHomeService[] chatHomes = new SimpleChatHomeService[numShards];
IntStream
.of(properties.getInmemory().getOwnedShards())
- .forEach(shard -> chatHomes[shard] = new SimpleChatHomeService(
- shard,
- storageStrategy,
- clock,
- properties.getChatroomBufferSize()));
+ .forEach(shard ->
+ {
+ SimpleChatHomeService service = chatHomes[shard] = new SimpleChatHomeService(
+ shard,
+ clock,
+ properties.getChatroomBufferSize());
+ service.restore(storageStrategy).block();
+ });
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHomeService(chatHomes, strategy);
+ return new ShardedChatHomeService(
+ properties.getInstanceId(),
+ chatHomes,
+ properties.getInmemory().getShardOwners(),
+ strategy);
}
@ConditionalOnProperty(
}
@ConditionalOnProperty(