From c259051502ea967b1a0afe2db156c75c12240fcb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 16 Sep 2023 21:56:16 +0200 Subject: [PATCH] WIP --- .../inmemory/InMemoryServicesConfiguration.java | 8 +++++--- .../inmemory/ShardedChatHomeService.java | 12 ++++++++---- .../inmemory/SimpleChatHomeService.java | 14 +++----------- .../implementation/kafka/InfoChannel.java | 17 +++++++++++++++++ .../kafka/KafkaChatHomeService.java | 6 ++++++ 5 files changed, 39 insertions(+), 18 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index 89a3c111..bc6f103b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -34,8 +34,7 @@ public class InMemoryServicesConfiguration return new SimpleChatHomeService( storageStrategy, clock, - properties.getChatroomBufferSize(), - properties.getInstanceUri()); + properties.getChatroomBufferSize()); } @Bean @@ -58,7 +57,10 @@ public class InMemoryServicesConfiguration clock, properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHomeService(chatHomes, strategy); + return new ShardedChatHomeService( + chatHomes, + properties.getInmemory().getShardOwners(), + strategy); } @ConditionalOnProperty( diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index b7268a86..c281d9e1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -8,6 +8,7 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; import java.util.Arrays; import java.util.HashSet; import java.util.Set; @@ -20,14 +21,20 @@ public class ShardedChatHomeService implements ChatHomeService { private final SimpleChatHomeService[] chatHomes; private final Set ownedShards; + private final String[] shardOwners; private final ShardingStrategy shardingStrategy; public ShardedChatHomeService( SimpleChatHomeService[] chatHomes, + URI[] shardOwners, ShardingStrategy shardingStrategy) { this.chatHomes = chatHomes; + this.shardOwners = Arrays + .stream(shardOwners) + .map(uri -> uri.toASCIIString()) + .toArray(size -> new String[size]); this.shardingStrategy = shardingStrategy; this.ownedShards = new HashSet<>(); for (int shard = 0; shard < chatHomes.length; shard++) @@ -94,10 +101,7 @@ public class ShardedChatHomeService implements ChatHomeService @Override public Mono getShardOwners() { - return Mono.just(Arrays - .stream(chatHomes) - .map(chatHome -> chatHome.getInstanceUri()) - .toArray(size -> new String[size])); + return Mono.just(shardOwners); } private int selectShard(UUID chatroomId) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 4cb490d6..cf6d20a5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -3,12 +3,10 @@ package de.juplo.kafka.chat.backend.implementation.inmemory; import de.juplo.kafka.chat.backend.domain.*; import de.juplo.kafka.chat.backend.domain.exceptions.UnknownChatroomException; import de.juplo.kafka.chat.backend.implementation.StorageStrategy; -import lombok.Getter; import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.net.URI; import java.time.Clock; import java.util.*; @@ -21,31 +19,26 @@ public class SimpleChatHomeService implements ChatHomeService private final Map chatRoomData; private final Clock clock; private final int bufferSize; - @Getter - private final URI instanceUri; public SimpleChatHomeService( StorageStrategy storageStrategy, Clock clock, - int bufferSize, - URI instanceUri) + int bufferSize) { this( null, storageStrategy, clock, - bufferSize, - instanceUri); + bufferSize); } public SimpleChatHomeService( Integer shard, StorageStrategy storageStrategy, Clock clock, - int bufferSize, - URI instanceUri) + int bufferSize) { log.info("Created SimpleChatHome for shard {}", shard); ; @@ -85,7 +78,6 @@ public class SimpleChatHomeService implements ChatHomeService }); this.clock = clock; this.bufferSize = bufferSize; - this.instanceUri = instanceUri; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 8756a062..4711dbd9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -31,6 +31,7 @@ public class InfoChannel implements Runnable private final Producer producer; private final Consumer consumer; private final int numShards; + private final String[] shardOwners; private final long[] currentOffset; private final long[] nextOffset; private final Map chatRoomInfo; @@ -56,6 +57,7 @@ public class InfoChannel implements Runnable this.numShards = consumer .partitionsFor(topic) .size(); + this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; IntStream @@ -221,11 +223,21 @@ public class InfoChannel implements Runnable case EVENT_SHARD_ASSIGNED: EventShardAssigned eventShardAssigned = (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); break; case EVENT_SHARD_REVOKED: EventShardRevoked eventShardRevoked = (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; break; default: @@ -272,4 +284,9 @@ public class InfoChannel implements Runnable { return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } + + Mono getShardOwners() + { + return Mono.just(shardOwners); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 9832519d..2984d8ce 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -61,6 +61,12 @@ public class KafkaChatHomeService implements ChatHomeService dataChannel.getOwnedShards()))); } + @Override + public Mono getShardOwners() + { + infoChannel.getShardOwners(); + } + int selectShard(UUID chatRoomId) { byte[] serializedKey = chatRoomId.toString().getBytes(); -- 2.20.1