import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
- ConsumerTaskExecutor.WorkAssignor workAssignor)
+ ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor)
{
return new ConsumerTaskExecutor(
taskExecutor,
infoChannel,
infoChannelConsumer,
- (consumer) -> { /* No work to assign here */ });
+ infoChannelWorkAssignor);
+ }
+
+ @Bean
+ ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
+ ChatBackendProperties properties,
+ DataChannel dataChannel)
+ {
+ return consumer ->
+ {
+ String topic = properties.getKafka().getInfoChannelTopic();
+ List<TopicPartition> partitions = consumer
+ .partitionsFor(topic)
+ .stream()
+ .map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition()))
+ .toList();
+ consumer.assign(partitions);
+ };
}
@Bean
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
- ConsumerTaskExecutor.WorkAssignor workAssignor)
+ ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor)
{
return new ConsumerTaskExecutor(
taskExecutor,
dataChannel,
dataChannelConsumer,
- workAssignor);
+ dataChannelWorkAssignor);
}
@Bean
- ConsumerTaskExecutor.WorkAssignor workAssignor(
+ ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,
DataChannel dataChannel)
{