WIP:ALIGN
authorKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 23:08:34 +0000 (01:08 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 14 Sep 2023 23:08:34 +0000 (01:08 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

index 787aec8..64cf455 100644 (file)
@@ -11,6 +11,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -52,13 +53,30 @@ public class KafkaServicesConfiguration
       ThreadPoolTaskExecutor taskExecutor,
       InfoChannel infoChannel,
       Consumer<String, AbstractMessageTo> infoChannelConsumer,
-      ConsumerTaskExecutor.WorkAssignor workAssignor)
+      ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
   {
     return new ConsumerTaskExecutor(
         taskExecutor,
         infoChannel,
         infoChannelConsumer,
-        (consumer) -> { /* No work to assign here */ });
+        infoChannelWorkAssignor);
+  }
+
+  @Bean
+  ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
+      ChatBackendProperties properties,
+      DataChannel dataChannel)
+  {
+    return consumer ->
+    {
+      String topic = properties.getKafka().getInfoChannelTopic();
+      List<TopicPartition> partitions = consumer
+          .partitionsFor(topic)
+          .stream()
+          .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+          .toList();
+      consumer.assign(partitions);
+    };
   }
 
   @Bean
@@ -66,17 +84,17 @@ public class KafkaServicesConfiguration
       ThreadPoolTaskExecutor taskExecutor,
       DataChannel dataChannel,
       Consumer<String, AbstractMessageTo> dataChannelConsumer,
-      ConsumerTaskExecutor.WorkAssignor workAssignor)
+      ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
   {
     return new ConsumerTaskExecutor(
         taskExecutor,
         dataChannel,
         dataChannelConsumer,
-        workAssignor);
+        dataChannelWorkAssignor);
   }
 
   @Bean
-  ConsumerTaskExecutor.WorkAssignor workAssignor(
+  ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
       ChatBackendProperties properties,
       DataChannel dataChannel)
   {