WIP: shard assigned/revoked events
authorKai Moritz <kai@juplo.de>
Sat, 16 Sep 2023 19:40:45 +0000 (21:40 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Jan 2024 15:04:43 +0000 (16:04 +0100)
14 files changed:
src/main/java/de/juplo/kafka/chat/backend/ChatBackendProperties.java
src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java
src/main/java/de/juplo/kafka/chat/backend/domain/ChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/InMemoryServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/inmemory/SimpleChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/InfoChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeService.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/AbstractMessageTo.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/chat/backend/implementation/inmemory/ShardedChatHomeServiceTest.java

index ec0b35f..ec5c7f5 100644 (file)
@@ -4,6 +4,7 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+import java.net.URI;
 import java.nio.file.Paths;
 
 
@@ -25,7 +26,8 @@ public class ChatBackendProperties
   {
     private ShardingStrategyType shardingStrategy = ShardingStrategyType.none;
     private int numShards = 1;
-    private int[] ownedShards = new int[] { 0 };
+    private int[] ownedShards = new int[0];
+    private URI[] shardOwners = new URI[0];
     private StorageStrategyType storageStrategy = StorageStrategyType.none;
     private String storageDirectory = Paths.get(System.getProperty("java.io.tmpdir"),"chat", "backend").toString();
   }
@@ -34,6 +36,7 @@ public class ChatBackendProperties
   @Setter
   public static class KafkaServicesProperties
   {
+    private URI instanceUri = URI.create("http://localhost:8080");
     private String clientIdPrefix = "DEV";
     private String bootstrapServers = ":9092";
     private String infoChannelTopic = "info_channel";
index 529be8d..f3efe79 100644 (file)
@@ -128,6 +128,12 @@ public class ChatBackendController
                 .build());
   }
 
+  @GetMapping("/shards")
+  public Mono<String[]> getShardOwners()
+  {
+    return chatHomeService.getShardOwners();
+  }
+
   @PostMapping("/store")
   public void store()
   {
index 388a20a..19fa26c 100644 (file)
@@ -15,4 +15,6 @@ public interface ChatHomeService
   Flux<ChatRoomInfo> getChatRoomInfo();
 
   Mono<ChatRoomData> getChatRoomData(UUID id);
+
+  Mono<String[]> getShardOwners();
 }
index f60b193..bc6f103 100644 (file)
@@ -57,7 +57,10 @@ public class InMemoryServicesConfiguration
             clock,
             properties.getChatroomBufferSize()));
     ShardingStrategy strategy = new KafkaLikeShardingStrategy(numShards);
-    return new ShardedChatHomeService(chatHomes, strategy);
+    return new ShardedChatHomeService(
+        chatHomes,
+        properties.getInmemory().getShardOwners(),
+        strategy);
   }
 
   @ConditionalOnProperty(
index 06c197b..c281d9e 100644 (file)
@@ -8,6 +8,8 @@ import lombok.extern.slf4j.Slf4j;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -19,14 +21,20 @@ public class ShardedChatHomeService implements ChatHomeService
 {
   private final SimpleChatHomeService[] chatHomes;
   private final Set<Integer> ownedShards;
+  private final String[] shardOwners;
   private final ShardingStrategy shardingStrategy;
 
 
   public ShardedChatHomeService(
       SimpleChatHomeService[] chatHomes,
+      URI[] shardOwners,
       ShardingStrategy shardingStrategy)
   {
     this.chatHomes = chatHomes;
+    this.shardOwners = Arrays
+        .stream(shardOwners)
+        .map(uri -> uri.toASCIIString())
+        .toArray(size -> new String[size]);
     this.shardingStrategy = shardingStrategy;
     this.ownedShards = new HashSet<>();
     for (int shard = 0; shard < chatHomes.length; shard++)
@@ -90,6 +98,12 @@ public class ShardedChatHomeService implements ChatHomeService
                 : throwable);
   }
 
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
+
   private int selectShard(UUID chatroomId)
   {
     return shardingStrategy.selectShard(chatroomId);
index 30a181e..cf6d20a 100644 (file)
@@ -114,4 +114,10 @@ public class SimpleChatHomeService implements ChatHomeService
         .justOrEmpty(chatRoomData.get(id))
         .switchIfEmpty(Mono.error(() -> new UnknownChatroomException(id)));
   }
+
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return Mono.empty();
+  }
 }
index da90663..4eedeb4 100644 (file)
@@ -138,6 +138,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
           currentOffset);
 
       consumer.seek(topicPartition, nextOffset[partition]);
+      infoChannel.sendShardAssignedEvent(partition);
     });
 
     consumer.resume(partitions);
@@ -151,6 +152,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
       int partition = topicPartition.partition();
       isShardOwned[partition] = false;
       log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
+      infoChannel.sendShardRevokedEvent(partition);
     });
   }
 
index 26e8696..4711dbd 100644 (file)
@@ -3,16 +3,20 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardAssigned;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventShardRevoked;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.errors.WakeupException;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.net.URI;
 import java.time.*;
 import java.util.HashMap;
 import java.util.Map;
@@ -27,9 +31,11 @@ public class InfoChannel implements Runnable
   private final Producer<String, AbstractMessageTo> producer;
   private final Consumer<String, AbstractMessageTo> consumer;
   private final int numShards;
+  private final String[] shardOwners;
   private final long[] currentOffset;
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomInfo> chatRoomInfo;
+  private final String instanceUri;
 
   private boolean running;
 
@@ -37,7 +43,8 @@ public class InfoChannel implements Runnable
   public InfoChannel(
     String topic,
     Producer<String, AbstractMessageTo> producer,
-    Consumer<String, AbstractMessageTo> infoChannelConsumer)
+    Consumer<String, AbstractMessageTo> infoChannelConsumer,
+    URI instanceUri)
   {
     log.debug(
         "Creating InfoChannel for topic {}",
@@ -50,11 +57,14 @@ public class InfoChannel implements Runnable
     this.numShards = consumer
         .partitionsFor(topic)
         .size();
+    this.shardOwners = new String[numShards];
     this.currentOffset = new long[numShards];
     this.nextOffset = new long[numShards];
     IntStream
         .range(0, numShards)
         .forEach(partition -> this.nextOffset[partition] = -1l);
+
+    this.instanceUri = instanceUri.toASCIIString();
   }
 
 
@@ -83,7 +93,7 @@ public class InfoChannel implements Runnable
       {
         if (metadata != null)
         {
-          log.info("Successfully sent chreate-request for chat room: {}", to);
+          log.info("Successfully sent created event for chat chat-room: {}", to);
           ChatRoomInfo chatRoomInfo = new ChatRoomInfo(chatRoomId, name, shard);
           sink.success(chatRoomInfo);
         }
@@ -91,7 +101,7 @@ public class InfoChannel implements Runnable
         {
           // On send-failure
           log.error(
-              "Could not send create-request for chat room (id={}, name={}): {}",
+              "Could not send created event for chat-room (id={}, name={}): {}",
               chatRoomId,
               name,
               exception);
@@ -101,6 +111,70 @@ public class InfoChannel implements Runnable
     });
   }
 
+  Mono<RecordMetadata> sendShardAssignedEvent(int shard)
+  {
+    EventShardAssigned to = EventShardAssigned.of(shard, instanceUri);
+
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              Integer.toString(shard),
+              to);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully sent shard assigned event for shard: {}", shard);
+          sink.success(metadata);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send shard assigned event for shard {}: {}",
+              shard,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
+  Mono<RecordMetadata> sendShardRevokedEvent(int shard)
+  {
+    EventShardRevoked to = EventShardRevoked.of(shard, instanceUri);
+
+    return Mono.create(sink ->
+    {
+      ProducerRecord<String, AbstractMessageTo> record =
+          new ProducerRecord<>(
+              topic,
+              Integer.toString(shard),
+              to);
+
+      producer.send(record, ((metadata, exception) ->
+      {
+        if (metadata != null)
+        {
+          log.info("Successfully sent shard revoked event for shard: {}", shard);
+          sink.success(metadata);
+        }
+        else
+        {
+          // On send-failure
+          log.error(
+              "Could not send shard revoked event for shard {}: {}",
+              shard,
+              exception);
+          sink.error(exception);
+        }
+      }));
+    });
+  }
+
 
   @Override
   public void run()
@@ -146,6 +220,26 @@ public class InfoChannel implements Runnable
           createChatRoom(eventChatRoomCreated.toChatRoomInfo());
           break;
 
+        case EVENT_SHARD_ASSIGNED:
+          EventShardAssigned eventShardAssigned =
+              (EventShardAssigned) record.value();
+          log.info(
+              "Shard {} was assigned to {}",
+              eventShardAssigned.getShard(),
+              eventShardAssigned.getUri());
+          shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
+          break;
+
+        case EVENT_SHARD_REVOKED:
+          EventShardRevoked eventShardRevoked =
+              (EventShardRevoked) record.value();
+          log.info(
+              "Shard {} was revoked from {}",
+              eventShardRevoked.getShard(),
+              eventShardRevoked.getUri());
+          shardOwners[eventShardRevoked.getShard()] = null;
+          break;
+
         default:
           log.debug(
               "Ignoring message for key={} with offset={}: {}",
@@ -190,4 +284,9 @@ public class InfoChannel implements Runnable
   {
     return Mono.fromSupplier(() -> chatRoomInfo.get(id));
   }
+
+  Mono<String[]> getShardOwners()
+  {
+    return Mono.just(shardOwners);
+  }
 }
index 9832519..9409716 100644 (file)
@@ -61,6 +61,12 @@ public class KafkaChatHomeService implements ChatHomeService
             dataChannel.getOwnedShards())));
   }
 
+  @Override
+  public Mono<String[]> getShardOwners()
+  {
+    return infoChannel.getShardOwners();
+  }
+
   int selectShard(UUID chatRoomId)
   {
     byte[] serializedKey = chatRoomId.toString().getBytes();
index 784ffa5..7795516 100644 (file)
@@ -124,7 +124,8 @@ public class KafkaServicesConfiguration
     return new InfoChannel(
         properties.getKafka().getInfoChannelTopic(),
         producer,
-        infoChannelConsumer);
+        infoChannelConsumer,
+        properties.getKafka().getInstanceUri());
   }
 
   @Bean
index 1aef3fd..843a2c4 100644 (file)
@@ -12,6 +12,8 @@ public class AbstractMessageTo
     COMMAND_CREATE_CHATROOM,
     EVENT_CHATMESSAGE_RECEIVED,
     EVENT_CHATROOM_CREATED,
+    EVENT_SHARD_ASSIGNED,
+    EVENT_SHARD_REVOKED,
   }
 
   @Getter
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardAssigned.java
new file mode 100644 (file)
index 0000000..8ba23f3
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventShardAssigned extends AbstractMessageTo
+{
+  private int shard;
+  private String uri;
+
+
+  public EventShardAssigned()
+  {
+    super(ToType.EVENT_SHARD_ASSIGNED);
+  }
+
+
+  public static EventShardAssigned of(
+      int shard,
+      String uri)
+  {
+    EventShardAssigned event = new EventShardAssigned();
+    event.setShard(shard);
+    event.setUri(uri);
+    return event;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/messages/info/EventShardRevoked.java
new file mode 100644 (file)
index 0000000..1e37f43
--- /dev/null
@@ -0,0 +1,35 @@
+package de.juplo.kafka.chat.backend.implementation.kafka.messages.info;
+
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@EqualsAndHashCode
+@ToString
+public class EventShardRevoked extends AbstractMessageTo
+{
+  private int shard;
+  private String uri;
+
+
+  public EventShardRevoked()
+  {
+    super(ToType.EVENT_SHARD_REVOKED);
+  }
+
+
+  public static EventShardRevoked of(
+      int shard,
+      String uri)
+  {
+    EventShardRevoked event = new EventShardRevoked();
+    event.setShard(shard);
+    event.setUri(uri);
+    return event;
+  }
+}
index 76ed2ba..3ff9e9e 100644 (file)
@@ -8,6 +8,7 @@ import de.juplo.kafka.chat.backend.storage.files.FilesStorageStrategy;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 
+import java.net.URI;
 import java.nio.file.Paths;
 import java.time.Clock;
 import java.util.stream.IntStream;
@@ -34,7 +35,14 @@ public class ShardedChatHomeServiceTest extends ChatHomeServiceWithShardsTest
 
       ShardingStrategy strategy = new KafkaLikeShardingStrategy(NUM_SHARDS);
 
-      return new ShardedChatHomeService(chatHomes, strategy);
+      return new ShardedChatHomeService(
+          chatHomes,
+          IntStream
+              .range(0, NUM_SHARDS)
+              .mapToObj(shard -> "http://instance-0")
+              .map(uriString -> URI.create(uriString))
+              .toArray(size -> new URI[size]),
+          strategy);
     }
 
     @Bean