From: Kai Moritz Date: Wed, 13 Sep 2023 18:45:22 +0000 (+0200) Subject: WIP X-Git-Tag: rebase--2023-09-13--21-01~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9b311d736e11d1e0dd54bbbf8c7f126dc097d412;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java index efcbe490..8f619827 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java @@ -5,7 +5,7 @@ import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessage import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -51,9 +51,14 @@ public class KafkaServicesApplicationRunner implements ApplicationRunner public void run(ApplicationArguments args) throws Exception { String infoTopic = properties.getKafka().getInfoChannelTopic(); - List< PartitionInfo> partitions = - infoChannelConsumer.partitionsFor(infoTopic); - infoChannelConsumer.assignment(partitions); + List partitions = infoChannelConsumer + .partitionsFor(infoTopic) + .stream() + .map(partitionInfo -> new TopicPartition( + infoTopic, + partitionInfo.partition())) + .toList(); + infoChannelConsumer.assign(partitions); log.info("Starting the consumer for the InfoChannel"); infoChannelConsumerJob = taskExecutor .submitCompletable(infoChannel)