WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 18:45:22 +0000 (20:45 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Sep 2023 18:45:22 +0000 (20:45 +0200)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java

index efcbe49..8f61982 100644 (file)
@@ -5,7 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage
 import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -51,9 +51,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner
   public void run(ApplicationArguments args) throws Exception
   {
     String infoTopic = properties.getKafka().getInfoChannelTopic();
-    List< PartitionInfo> partitions =
-        infoChannelConsumer.partitionsFor(infoTopic);
-    infoChannelConsumer.assignment(partitions);
+    List<TopicPartition> partitions = infoChannelConsumer
+        .partitionsFor(infoTopic)
+        .stream()
+        .map(partitionInfo -> new TopicPartition(
+            infoTopic,
+            partitionInfo.partition()))
+        .toList();
+    infoChannelConsumer.assign(partitions);
     log.info("Starting the consumer for the InfoChannel");
     infoChannelConsumerJob = taskExecutor
         .submitCompletable(infoChannel)