package de.juplo.kafka.wordcount.splitter;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.standard.SpelExpressionParser;
-import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
-import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.kafka.channel.SubscribableKafkaChannel;
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
-import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.messaging.MessageHandler;
+import org.springframework.messaging.SubscribableChannel;
@SpringBootApplication
@EnableIntegration
public class SplitterApplication
{
- @InboundChannelAdapter(channel = "recordings")
@Bean
- KafkaMessageSource<String, String> source(
- ConsumerFactory<String, String> cf,
- SplitterApplicationProperties properties)
+ SubscribableChannel recordings(
+ KafkaTemplate<String, String> kafkaTemplate,
+ KafkaListenerContainerFactory<AbstractMessageListenerContainer<String, String>> containerFactory,
+ SplitterApplicationProperties properties,
+ @Value("${spring.kafka.consumer.group-id}") String groupId)
{
- return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
+ SubscribableKafkaChannel channel =
+ new SubscribableKafkaChannel(kafkaTemplate, containerFactory, properties.getInputTopic());
+ channel.setGroupId(groupId);
+ return channel;
}
@Bean