package de.juplo.kafka.chat.backend.domain;
-import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.util.HashSet;
+import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
-@RequiredArgsConstructor
+@Slf4j
public class ShardedChatHome implements ChatHome
{
private final ChatHome[] chatHomes;
- private final ShardingStrategy selectionStrategy;
+ private final Set<Integer> ownedShards;
+ private final ShardingStrategy shardingStrategy;
+
+
+ public ShardedChatHome(
+ ChatHome[] chatHomes,
+ ShardingStrategy shardingStrategy)
+ {
+ this.chatHomes = chatHomes;
+ this.shardingStrategy = shardingStrategy;
+ this.ownedShards = new HashSet<>();
+ for (int shard = 0; shard < chatHomes.length; shard++)
+ if(chatHomes[shard] != null)
+ this.ownedShards.add(shard);
+ log.info(
+ "Created ShardedChatHome for shards: {}",
+ ownedShards
+ .stream()
+ .map(String::valueOf)
+ .collect(Collectors.joining(", ")));
+ }
@Override
public Flux<ChatRoom> getChatRooms()
{
return Flux
- .fromArray(chatHomes)
- .flatMap(chatHome -> chatHome.getChatRooms());
+ .fromIterable(ownedShards)
+ .flatMap(shard -> chatHomes[shard].getChatRooms());
}
private int selectShard(UUID chatroomId)
{
- return selectionStrategy.selectShard(chatroomId);
+ return shardingStrategy.selectShard(chatroomId);
}
}
package de.juplo.kafka.chat.backend.domain;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
-@RequiredArgsConstructor
public class SimpleChatHome implements ChatHome
{
private final ChatHomeService service;
private final int shard;
+ public SimpleChatHome(ChatHomeService service, int shard)
+ {
+ log.info("Created SimpleChatHome for shard {}", shard);
+ this.service = service;
+ this.shard = shard;
+ }
+
public SimpleChatHome(ChatHomeService service)
{
this(service, 0);
.exchange()
.expectStatus().isOk()
.expectBody().jsonPath("$.status").isEqualTo("UP");
+ webTestClient
+ .get()
+ .uri("http://localhost:{port}/list", port)
+ .accept(MediaType.APPLICATION_JSON)
+ .exchange()
+ .expectStatus().isOk()
+ .expectBody()
+ .jsonPath("$.length()").isEqualTo(1)
+ .jsonPath("$[0].name").isEqualTo("FOO");
webTestClient
.get()
.uri("http://localhost:{port}/5c73531c-6fc4-426c-adcb-afc5c140a0f7", port)