// Then
assertThat(mono).sendsError(e ->
{
- assertThat(e).isInstanceOf(UnknownChatroomException.class);
- UnknownChatroomException unknownChatroomException = (UnknownChatroomException) e;
- assertThat(unknownChatroomException.getChatroomId()).isEqualTo(chatRoomId);
+ assertThat(e).isInstanceOf(ShardNotOwnedException.class);
+ ShardNotOwnedException shardNotOwnedException = (ShardNotOwnedException) e;
+ assertThat(shardNotOwnedException.getShard()).isEqualTo(0);
});
}
}
{
@Bean
ShardedChatHome chatHome(
- Integer numShards,
- int[] ownedShards,
InMemoryChatHomeService chatHomeService)
{
- SimpleChatHome[] chatHomes = new SimpleChatHome[numShards];
+ SimpleChatHome[] chatHomes = new SimpleChatHome[numShards()];
IntStream
- .of(ownedShards)
+ .of(ownedShards())
.forEach(shard -> chatHomes[shard] = new SimpleChatHome(chatHomeService, shard));
- ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
+ ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards());
return new ShardedChatHome(chatHomes, strategy);
}
@Bean
InMemoryChatHomeService chatHomeService(
- Integer numShards,
- int[] ownedShards,
StorageStrategy storageStrategy)
{
return new InMemoryChatHomeService(
- numShards,
- ownedShards,
+ numShards(),
+ ownedShards(),
storageStrategy.read());
}
@Bean
- public FilesStorageStrategy storageStrategy(Integer numShards)
+ public FilesStorageStrategy storageStrategy()
{
return new FilesStorageStrategy(
Paths.get("target", "test-classes", "data", "files"),
Clock.systemDefaultZone(),
8,
- new KafkaLikeShardingStrategy(numShards),
+ new KafkaLikeShardingStrategy(numShards()),
messageFlux -> new InMemoryChatRoomService(messageFlux),
new ObjectMapper());
}
- @Bean
Integer numShards()
{
return 10;
}
- @Bean
int[] ownedShards()
{
return new int[] { 2 };