feat: Introduced interface `ShardingPublisherStrategy`
authorKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 08:53:10 +0000 (09:53 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 20 Feb 2024 09:35:33 +0000 (10:35 +0100)
* The interface is used by `DataChannel` to publish the changed ownership
  each time, a new partition is assigned to the consumer-group.
* Added a dummy-implementation in `KafkaServicesConfiguration`.

src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/DataChannel.java
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/domain/ShardingPublisherStrategy.java
new file mode 100644 (file)
index 0000000..9a1c725
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.chat.backend.domain;
+
+import reactor.core.publisher.Mono;
+
+
+public interface ShardingPublisherStrategy
+{
+  Mono<String> publishOwnership(int shard);
+}
index 9cafbaa..2fa4998 100644 (file)
@@ -38,6 +38,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
   private final long[] nextOffset;
   private final Map<UUID, ChatRoomData>[] chatRoomData;
   private final InfoChannel infoChannel;
+  private final ShardingPublisherStrategy shardingPublisherStrategy;
 
   private boolean running;
   @Getter
@@ -53,7 +54,8 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
     int numShards,
     int bufferSize,
     Clock clock,
-    InfoChannel infoChannel)
+    InfoChannel infoChannel,
+    ShardingPublisherStrategy shardingPublisherStrategy)
   {
     log.debug(
         "{}: Creating DataChannel for topic {} with {} partitions",
@@ -76,6 +78,7 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
         .range(0, numShards)
         .forEach(shard -> this.chatRoomData[shard] = new HashMap<>());
     this.infoChannel = infoChannel;
+    this.shardingPublisherStrategy = shardingPublisherStrategy;
   }
 
 
@@ -142,6 +145,13 @@ public class DataChannel implements Runnable, ConsumerRebalanceListener
 
       consumer.seek(topicPartition, nextOffset[partition]);
       infoChannel.sendShardAssignedEvent(partition);
+      shardingPublisherStrategy
+          .publishOwnership(partition)
+          .doOnNext(instanceId -> log.info(
+              "Instance {} was published as owner of shard {}",
+              instanceId,
+              partition))
+          .subscribe();
     });
 
     consumer.resume(partitions);
index 7bb1ab9..c3027fa 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
 import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
 import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
@@ -20,6 +21,7 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import reactor.core.publisher.Mono;
 
 import java.time.Clock;
 import java.time.ZoneId;
@@ -137,7 +139,8 @@ public class KafkaServicesConfiguration
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
       ZoneId zoneId,
       Clock clock,
-      InfoChannel infoChannel)
+      InfoChannel infoChannel,
+      ShardingPublisherStrategy shardingPublisherStrategy)
   {
     return new DataChannel(
         properties.getInstanceId(),
@@ -148,7 +151,8 @@ public class KafkaServicesConfiguration
         properties.getKafka().getNumPartitions(),
         properties.getChatroomBufferSize(),
         clock,
-        infoChannel);
+        infoChannel,
+        shardingPublisherStrategy);
   }
 
   @Bean
@@ -280,6 +284,18 @@ public class KafkaServicesConfiguration
     return properties;
   }
 
+  @Bean
+  ShardingPublisherStrategy shardingPublisherStrategy()
+  {
+    return new ShardingPublisherStrategy() {
+      @Override
+      public Mono<String> publishOwnership(int shard)
+      {
+        return Mono.just(Integer.toString(shard));
+      }
+    };
+  }
+
   @Bean
   ZoneId zoneId()
   {
index 956d7ce..52a527d 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.chat.backend.implementation.kafka;
 
 import de.juplo.kafka.chat.backend.ChatBackendProperties;
+import de.juplo.kafka.chat.backend.domain.ShardingPublisherStrategy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
@@ -10,6 +11,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Import;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
+import reactor.core.publisher.Mono;
 
 import java.time.Clock;
 import java.util.List;
@@ -23,6 +25,12 @@ public class KafkaTestUtils
   @Import(KafkaServicesConfiguration.class)
   public static class KafkaTestConfiguration
   {
+    @Bean
+    public ShardingPublisherStrategy shardingPublisherStrategy()
+    {
+      return shard -> Mono.just("MOCKED!");
+    }
+
     @Bean
     public WorkAssignor dataChannelWorkAssignor(
         ChatBackendProperties properties,