import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
-
-import java.time.Clock;
+import org.springframework.expression.Expression;
+import org.springframework.expression.ExpressionParser;
+import org.springframework.expression.common.LiteralExpression;
+import org.springframework.expression.spel.standard.SpelExpressionParser;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter;
+import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.kafka.listener.KafkaMessageListenerContainer;
+import org.springframework.messaging.MessageHandler;
@SpringBootApplication
@EnableConfigurationProperties(SplitterApplicationProperties.class)
+@EnableIntegration
public class SplitterApplication
{
@Bean
- Clock clock()
+ MessageProducer messageProducer(
+ KafkaMessageListenerContainer<String, String> messageListenerContainer)
+ {
+ KafkaMessageDrivenChannelAdapter<String, String> adapter =
+ new KafkaMessageDrivenChannelAdapter(messageListenerContainer);
+ adapter.setOutputChannelName("recordings");
+ return adapter;
+ }
+
+ @Bean
+ KafkaMessageListenerContainer<String, String> messageListenerContainer(
+ ConsumerFactory<String,String> consumerFactory,
+ SplitterApplicationProperties properties)
{
- return Clock.systemDefaultZone();
+ return
+ new KafkaMessageListenerContainer<>(
+ consumerFactory,
+ new ContainerProperties(properties.getInputTopic()));
}
+ @Bean
+ @ServiceActivator(inputChannel = "words")
+ MessageHandler handler(
+ KafkaTemplate<String, String> kafkaTemplate,
+ SplitterApplicationProperties properties)
+ {
+ KafkaProducerMessageHandler<String, String> handler =
+ new KafkaProducerMessageHandler<>(kafkaTemplate);
+ handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+ final ExpressionParser parser = new SpelExpressionParser();
+ Expression expression = parser.parseExpression("headers['kafka_receivedMessageKey']");
+ handler.setMessageKeyExpression(expression);
+ return handler;
+ }
public static void main(String[] args)
{