import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
public void run(ApplicationArguments args) throws Exception
{
String infoTopic = properties.getKafka().getInfoChannelTopic();
- List< PartitionInfo> partitions =
- infoChannelConsumer.partitionsFor(infoTopic);
- infoChannelConsumer.assignment(partitions);
+ List<TopicPartition> partitions = infoChannelConsumer
+ .partitionsFor(infoTopic)
+ .stream()
+ .map(partitionInfo -> new TopicPartition(
+ infoTopic,
+ partitionInfo.partition()))
+ .toList();
+ infoChannelConsumer.assign(partitions);
log.info("Starting the consumer for the InfoChannel");
infoChannelConsumerJob = taskExecutor
.submitCompletable(infoChannel)