From 8d997cc65763b3f12fb680da67f471590e6eeeb2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 16 Sep 2023 21:40:45 +0200 Subject: [PATCH] WIP: shard assigned/revoked events --- .../chat/backend/ChatBackendProperties.java | 5 +- .../backend/api/ChatBackendController.java | 6 + .../chat/backend/domain/ChatHomeService.java | 2 + .../InMemoryServicesConfiguration.java | 5 +- .../inmemory/ShardedChatHomeService.java | 14 +++ .../inmemory/SimpleChatHomeService.java | 6 + .../implementation/kafka/DataChannel.java | 2 + .../implementation/kafka/InfoChannel.java | 105 +++++++++++++++++- .../kafka/KafkaChatHomeService.java | 6 + .../kafka/KafkaServicesConfiguration.java | 3 +- .../kafka/messages/AbstractMessageTo.java | 2 + .../messages/info/EventShardAssigned.java | 35 ++++++ .../messages/info/EventShardRevoked.java | 35 ++++++ .../inmemory/ShardedChatHomeServiceTest.java | 10 +- 14 files changed, 229 insertions(+), 7 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java index ec0b35f8..ec5c7f5f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java +++ b/src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java @@ -4,6 +4,7 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import java.net.URI; import java.nio.file.Paths; @@ -25,7 +26,8 @@ public class ChatBackendProperties { private ShardingStrategyType shardingStrategy = ShardingStrategyType.none; private int numShards = 1; - private int[] ownedShards = new int[] { 0 }; + private int[] ownedShards = new int[0]; + private URI[] shardOwners = new URI[0]; private StorageStrategyType storageStrategy = StorageStrategyType.none; private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString(); } @@ -34,6 +36,7 @@ public class ChatBackendProperties @Setter public static class KafkaServicesProperties { + private URI instanceUri = URI.create("http://localhost:8080"); private String clientIdPrefix = "DEV"; private String bootstrapServers = ":9092"; private String infoChannelTopic = "info_channel"; diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 529be8df..f3efe791 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -128,6 +128,12 @@ public class ChatBackendController .build()); } + @GetMapping("/shards") + public Mono getShardOwners() + { + return chatHomeService.getShardOwners(); + } + @PostMapping("/store") public void store() { diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java index 388a20ad..19fa26c4 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java @@ -15,4 +15,6 @@ public interface ChatHomeService Flux getChatRoomInfo(); Mono getChatRoomData(UUID id); + + Mono getShardOwners(); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java index f60b193e..bc6f103b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java @@ -57,7 +57,10 @@ public class InMemoryServicesConfiguration clock, properties.getChatroomBufferSize())); ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards); - return new ShardedChatHomeService(chatHomes, strategy); + return new ShardedChatHomeService( + chatHomes, + properties.getInmemory().getShardOwners(), + strategy); } @ConditionalOnProperty( diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java index 06c197bf..c281d9e1 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java @@ -8,6 +8,8 @@ import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; +import java.util.Arrays; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -19,14 +21,20 @@ public class ShardedChatHomeService implements ChatHomeService { private final SimpleChatHomeService[] chatHomes; private final Set ownedShards; + private final String[] shardOwners; private final ShardingStrategy shardingStrategy; public ShardedChatHomeService( SimpleChatHomeService[] chatHomes, + URI[] shardOwners, ShardingStrategy shardingStrategy) { this.chatHomes = chatHomes; + this.shardOwners = Arrays + .stream(shardOwners) + .map(uri -> uri.toASCIIString()) + .toArray(size -> new String[size]); this.shardingStrategy = shardingStrategy; this.ownedShards = new HashSet<>(); for (int shard = 0; shard < chatHomes.length; shard++) @@ -90,6 +98,12 @@ public class ShardedChatHomeService implements ChatHomeService : throwable); } + @Override + public Mono getShardOwners() + { + return Mono.just(shardOwners); + } + private int selectShard(UUID chatroomId) { return shardingStrategy.selectShard(chatroomId); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java index 30a181ee..cf6d20a5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java @@ -114,4 +114,10 @@ public class SimpleChatHomeService implements ChatHomeService .justOrEmpty(chatRoomData.get(id)) .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id))); } + + @Override + public Mono getShardOwners() + { + return Mono.empty(); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java index da906631..4eedeb4a 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java @@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener currentOffset); consumer.seek(topicPartition, nextOffset[partition]); + infoChannel.sendShardAssignedEvent(partition); }); consumer.resume(partitions); @@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener int partition = topicPartition.partition(); isShardOwned[partition] = false; log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]); + infoChannel.sendShardRevokedEvent(partition); }); } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java index 26e86963..4711dbd9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java @@ -3,16 +3,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned; +import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.net.URI; import java.time.*; import java.util.HashMap; import java.util.Map; @@ -27,9 +31,11 @@ public class InfoChannel implements Runnable private final Producer producer; private final Consumer consumer; private final int numShards; + private final String[] shardOwners; private final long[] currentOffset; private final long[] nextOffset; private final Map chatRoomInfo; + private final String instanceUri; private boolean running; @@ -37,7 +43,8 @@ public class InfoChannel implements Runnable public InfoChannel( String topic, Producer producer, - Consumer infoChannelConsumer) + Consumer infoChannelConsumer, + URI instanceUri) { log.debug( "Creating InfoChannel for topic {}", @@ -50,11 +57,14 @@ public class InfoChannel implements Runnable this.numShards = consumer .partitionsFor(topic) .size(); + this.shardOwners = new String[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; IntStream .range(0, numShards) .forEach(partition -> this.nextOffset[partition] = -1l); + + this.instanceUri = instanceUri.toASCIIString(); } @@ -83,7 +93,7 @@ public class InfoChannel implements Runnable { if (metadata != null) { - log.info("Successfully sent chreate-request for chat room: {}", to); + log.info("Successfully sent created event for chat chat-room: {}", to); ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard); sink.success(chatRoomInfo); } @@ -91,7 +101,7 @@ public class InfoChannel implements Runnable { // On send-failure log.error( - "Could not send create-request for chat room (id={}, name={}): {}", + "Could not send created event for chat-room (id={}, name={}): {}", chatRoomId, name, exception); @@ -101,6 +111,70 @@ public class InfoChannel implements Runnable }); } + Mono sendShardAssignedEvent(int shard) + { + EventShardAssigned to = EventShardAssigned.of(shard, instanceUri); + + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard assigned event for shard: {}", shard); + sink.success(metadata); + } + else + { + // On send-failure + log.error( + "Could not send shard assigned event for shard {}: {}", + shard, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendShardRevokedEvent(int shard) + { + EventShardRevoked to = EventShardRevoked.of(shard, instanceUri); + + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + Integer.toString(shard), + to); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully sent shard revoked event for shard: {}", shard); + sink.success(metadata); + } + else + { + // On send-failure + log.error( + "Could not send shard revoked event for shard {}: {}", + shard, + exception); + sink.error(exception); + } + })); + }); + } + @Override public void run() @@ -146,6 +220,26 @@ public class InfoChannel implements Runnable createChatRoom(eventChatRoomCreated.toChatRoomInfo()); break; + case EVENT_SHARD_ASSIGNED: + EventShardAssigned eventShardAssigned = + (EventShardAssigned) record.value(); + log.info( + "Shard {} was assigned to {}", + eventShardAssigned.getShard(), + eventShardAssigned.getUri()); + shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri(); + break; + + case EVENT_SHARD_REVOKED: + EventShardRevoked eventShardRevoked = + (EventShardRevoked) record.value(); + log.info( + "Shard {} was revoked from {}", + eventShardRevoked.getShard(), + eventShardRevoked.getUri()); + shardOwners[eventShardRevoked.getShard()] = null; + break; + default: log.debug( "Ignoring message for key={} with offset={}: {}", @@ -190,4 +284,9 @@ public class InfoChannel implements Runnable { return Mono.fromSupplier(() -> chatRoomInfo.get(id)); } + + Mono getShardOwners() + { + return Mono.just(shardOwners); + } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java index 9832519d..9409716f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java @@ -61,6 +61,12 @@ public class KafkaChatHomeService implements ChatHomeService dataChannel.getOwnedShards()))); } + @Override + public Mono getShardOwners() + { + return infoChannel.getShardOwners(); + } + int selectShard(UUID chatRoomId) { byte[] serializedKey = chatRoomId.toString().getBytes(); diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 784ffa54..77955168 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -124,7 +124,8 @@ public class KafkaServicesConfiguration return new InfoChannel( properties.getKafka().getInfoChannelTopic(), producer, - infoChannelConsumer); + infoChannelConsumer, + properties.getKafka().getInstanceUri()); } @Bean diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java index 1aef3fd1..843a2c45 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java @@ -12,6 +12,8 @@ public class AbstractMessageTo COMMAND_CREATE_CHATROOM, EVENT_CHATMESSAGE_RECEIVED, EVENT_CHATROOM_CREATED, + EVENT_SHARD_ASSIGNED, + EVENT_SHARD_REVOKED, } @Getter diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java new file mode 100644 index 00000000..8ba23f3c --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventShardAssigned extends AbstractMessageTo +{ + private int shard; + private String uri; + + + public EventShardAssigned() + { + super(ToType.EVENT_SHARD_ASSIGNED); + } + + + public static EventShardAssigned of( + int shard, + String uri) + { + EventShardAssigned event = new EventShardAssigned(); + event.setShard(shard); + event.setUri(uri); + return event; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java new file mode 100644 index 00000000..1e37f43f --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java @@ -0,0 +1,35 @@ +package de.juplo.kafka.chat.backend.implementation.kafka.messages.info; + +import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + + +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class EventShardRevoked extends AbstractMessageTo +{ + private int shard; + private String uri; + + + public EventShardRevoked() + { + super(ToType.EVENT_SHARD_REVOKED); + } + + + public static EventShardRevoked of( + int shard, + String uri) + { + EventShardRevoked event = new EventShardRevoked(); + event.setShard(shard); + event.setUri(uri); + return event; + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java index 76ed2baa..3ff9e9e1 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java @@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import java.net.URI; import java.nio.file.Paths; import java.time.Clock; import java.util.stream.IntStream; @@ -34,7 +35,14 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS); - return new ShardedChatHomeService(chatHomes, strategy); + return new ShardedChatHomeService( + chatHomes, + IntStream + .range(0, NUM_SHARDS) + .mapToObj(shard -> "http://instance-0") + .map(uriString -> URI.create(uriString)) + .toArray(size -> new URI[size]), + strategy); } @Bean -- 2.20.1