return new SimpleChatHomeService(
storageStrategy,
clock,
- properties.getChatroomBufferSize(),
- properties.getInstanceUri());
+ properties.getChatroomBufferSize());
}
@Bean
clock,
properties.getChatroomBufferSize()));
ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
- return new ShardedChatHomeService(chatHomes, strategy);
+ return new ShardedChatHomeService(
+ chatHomes,
+ properties.getInmemory().getShardOwners(),
+ strategy);
}
@ConditionalOnProperty(
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
{
private final SimpleChatHomeService[] chatHomes;
private final Set<Integer> ownedShards;
+ private final String[] shardOwners;
private final ShardingStrategy shardingStrategy;
public ShardedChatHomeService(
SimpleChatHomeService[] chatHomes,
+ URI[] shardOwners,
ShardingStrategy shardingStrategy)
{
this.chatHomes = chatHomes;
+ this.shardOwners = Arrays
+ .stream(shardOwners)
+ .map(uri -> uri.toASCIIString())
+ .toArray(size -> new String[size]);
this.shardingStrategy = shardingStrategy;
this.ownedShards = new HashSet<>();
for (int shard = 0; shard < chatHomes.length; shard++)
@Override
public Mono<String[]> getShardOwners()
{
- return Mono.just(Arrays
- .stream(chatHomes)
- .map(chatHome -> chatHome.getInstanceUri())
- .toArray(size -> new String[size]));
+ return Mono.just(shardOwners);
}
private int selectShard(UUID chatroomId)
import de.juplo.kafka.chat.backend.domain.*;
import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException;
import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
-import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import java.net.URI;
import java.time.Clock;
import java.util.*;
private final Map<UUID, ChatRoomData> chatRoomData;
private final Clock clock;
private final int bufferSize;
- @Getter
- private final URI instanceUri;
public SimpleChatHomeService(
StorageStrategy storageStrategy,
Clock clock,
- int bufferSize,
- URI instanceUri)
+ int bufferSize)
{
this(
null,
storageStrategy,
clock,
- bufferSize,
- instanceUri);
+ bufferSize);
}
public SimpleChatHomeService(
Integer shard,
StorageStrategy storageStrategy,
Clock clock,
- int bufferSize,
- URI instanceUri)
+ int bufferSize)
{
log.info("Created SimpleChatHome for shard {}", shard);
;
});
this.clock = clock;
this.bufferSize = bufferSize;
- this.instanceUri = instanceUri;
}
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
private final int numShards;
+ private final String[] shardOwners;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
this.numShards = consumer
.partitionsFor(topic)
.size();
+ this.shardOwners = new String[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
IntStream
case EVENT_SHARD_ASSIGNED:
EventShardAssigned eventShardAssigned =
(EventShardAssigned) record.value();
+ log.info(
+ "Shard {} was assigned to {}",
+ eventShardAssigned.getShard(),
+ eventShardAssigned.getUri());
+ shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
break;
case EVENT_SHARD_REVOKED:
EventShardRevoked eventShardRevoked =
(EventShardRevoked) record.value();
+ log.info(
+ "Shard {} was revoked from {}",
+ eventShardRevoked.getShard(),
+ eventShardRevoked.getUri());
+ shardOwners[eventShardRevoked.getShard()] = null;
break;
default:
{
return Mono.fromSupplier(() -> chatRoomInfo.get(id));
}
+
+ Mono<String[]> getShardOwners()
+ {
+ return Mono.just(shardOwners);
+ }
}
dataChannel.getOwnedShards())));
}
+ @Override
+ public Mono<String[]> getShardOwners()
+ {
+ infoChannel.getShardOwners();
+ }
+
int selectShard(UUID chatRoomId)
{
byte[] serializedKey = chatRoomId.toString().getBytes();