From 26ec487269fab8d63898d55f2f266af343c8f6ec Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Sep 2023 01:08:34 +0200 Subject: [PATCH] WIP:ALIGN --- .../kafka/KafkaServicesConfiguration.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 787aec8b..64cf455e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -11,6 +11,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -52,13 +53,30 @@ public class KafkaServicesConfiguration ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer infoChannelConsumer, - ConsumerTaskExecutor.WorkAssignor workAssignor) + ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor) { return new ConsumerTaskExecutor( taskExecutor, infoChannel, infoChannelConsumer, - (consumer) -> { /* No work to assign here */ }); + infoChannelWorkAssignor); + } + + @Bean + ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor( + ChatBackendProperties properties, + DataChannel dataChannel) + { + return consumer -> + { + String topic = properties.getKafka().getInfoChannelTopic(); + List partitions = consumer + .partitionsFor(topic) + .stream() + .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) + .toList(); + consumer.assign(partitions); + }; } @Bean @@ -66,17 +84,17 @@ public class KafkaServicesConfiguration ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, - ConsumerTaskExecutor.WorkAssignor workAssignor) + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor) { return new ConsumerTaskExecutor( taskExecutor, dataChannel, dataChannelConsumer, - workAssignor); + dataChannelWorkAssignor); } @Bean - ConsumerTaskExecutor.WorkAssignor workAssignor( + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties, DataChannel dataChannel) { -- 2.20.1