From: Kai Moritz Date: Thu, 14 Sep 2023 23:08:34 +0000 (+0200) Subject: WIP:ALIGN X-Git-Tag: rebase--2023-09-15--10-15~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=26ec487269fab8d63898d55f2f266af343c8f6ec;p=demos%2Fkafka%2Fchat WIP:ALIGN --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java index 787aec8b..64cf455e 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java +++ b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java @@ -11,6 +11,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -52,13 +53,30 @@ public class KafkaServicesConfiguration ThreadPoolTaskExecutor taskExecutor, InfoChannel infoChannel, Consumer infoChannelConsumer, - ConsumerTaskExecutor.WorkAssignor workAssignor) + ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor) { return new ConsumerTaskExecutor( taskExecutor, infoChannel, infoChannelConsumer, - (consumer) -> { /* No work to assign here */ }); + infoChannelWorkAssignor); + } + + @Bean + ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor( + ChatBackendProperties properties, + DataChannel dataChannel) + { + return consumer -> + { + String topic = properties.getKafka().getInfoChannelTopic(); + List partitions = consumer + .partitionsFor(topic) + .stream() + .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())) + .toList(); + consumer.assign(partitions); + }; } @Bean @@ -66,17 +84,17 @@ public class KafkaServicesConfiguration ThreadPoolTaskExecutor taskExecutor, DataChannel dataChannel, Consumer dataChannelConsumer, - ConsumerTaskExecutor.WorkAssignor workAssignor) + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor) { return new ConsumerTaskExecutor( taskExecutor, dataChannel, dataChannelConsumer, - workAssignor); + dataChannelWorkAssignor); } @Bean - ConsumerTaskExecutor.WorkAssignor workAssignor( + ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor( ChatBackendProperties properties, DataChannel dataChannel) {