From 9b311d736e11d1e0dd54bbbf8c7f126dc097d412 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Sep 2023 20:45:22 +0200 Subject: [PATCH] WIP --- .../kafka/KafkaServicesApplicationRunner.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index efcbe490..8f619827 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -5,7 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -51,9 +51,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { String infoTopic = properties.getKafka().getInfoChannelTopic(); - List< PartitionInfo> partitions = - infoChannelConsumer.partitionsFor(infoTopic); - infoChannelConsumer.assignment(partitions); + List partitions = infoChannelConsumer + .partitionsFor(infoTopic) + .stream() + .map(partitionInfo -> new TopicPartition( + infoTopic, + partitionInfo.partition())) + .toList(); + infoChannelConsumer.assign(partitions); log.info("Starting the consumer for the InfoChannel"); infoChannelConsumerJob = taskExecutor .submitCompletable(infoChannel) -- 2.20.1