import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
-
-import java.time.Clock;
+import org.springframework.expression.common.LiteralExpression;
+import org.springframework.integration.annotation.InboundChannelAdapter;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.config.EnableIntegration;
+import org.springframework.integration.kafka.inbound.KafkaMessageSource;
+import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ConsumerProperties;
+import org.springframework.messaging.MessageHandler;
@SpringBootApplication
@EnableConfigurationProperties(SplitterApplicationProperties.class)
+@EnableIntegration
public class SplitterApplication
{
+ @InboundChannelAdapter(channel = "recordings")
@Bean
- Clock clock()
+ KafkaMessageSource<String, String> source(
+ ConsumerFactory<String, String> cf,
+ SplitterApplicationProperties properties)
{
- return Clock.systemDefaultZone();
+ return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic()));
}
+ @Bean
+ @ServiceActivator(inputChannel = "words")
+ MessageHandler handler(
+ KafkaTemplate<String, String> kafkaTemplate,
+ SplitterApplicationProperties properties)
+ {
+ KafkaProducerMessageHandler<String, String> handler =
+ new KafkaProducerMessageHandler<>(kafkaTemplate);
+ handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic()));
+ return handler;
+ }
public static void main(String[] args)
{
+++ /dev/null
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.core.task.TaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
-
-import javax.annotation.PreDestroy;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-
-@Component
-@Slf4j
-public class SplitterStreamProcessor implements Runnable
-{
- private final MessageSplitter splitter;
- private final String inputTopic;
- private final String outputTopic;
- private final KafkaConsumer<String, String> consumer;
- private final KafkaProducer<String, String> producer;
- private final Clock clock;
- private final int commitInterval;
- private final Lock running = new ReentrantLock();
-
- private boolean stopped = false;
- private long[] offsets;
- private Optional<Integer>[] leaderEpochs;
- private long lastCommit;
-
- public SplitterStreamProcessor(
- MessageSplitter splitter,
- SplitterApplicationProperties properties,
- Clock clock,
- TaskExecutor executor)
- {
- this.splitter = splitter;
-
- this.inputTopic = properties.getInputTopic();
- this.outputTopic = properties.getOutputTopic();
-
- this.clock = clock;
- this.commitInterval = properties.getCommitInterval();
-
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
- Properties props;
-
- props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
- props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- consumer = new KafkaConsumer<>(props);
-
- props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
- props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- producer = new KafkaProducer<>(props);
-
- executor.execute(this);
- }
-
- public void run()
- {
- running.lock();
-
- try
- {
- log.info("Initializing transaction");
- producer.initTransactions();
-
- log.info("Subscribing to topic {}", inputTopic);
- consumer.subscribe(
- Arrays.asList(inputTopic),
- new TransactionalConsumerRebalanceListener());
-
- while (!stopped)
- {
- ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
-
- records.forEach(inputRecord ->
- {
- log.debug(
- "Received a recording of {}, partition={}, offset={}, epoch={}",
- inputRecord.key(),
- inputRecord.partition(),
- inputRecord.offset(),
- inputRecord.leaderEpoch());
-
- offsets[inputRecord.partition()] = inputRecord.offset();
- leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
-
- String[] words = splitter.split(inputRecord.value());
- for (int i = 0; i < words.length; i++)
- {
- ProducerRecord<String, String> outputRecord =
- new ProducerRecord<>(
- outputTopic,
- inputRecord.key(),
- words[i].trim());
-
- producer.send(outputRecord, (metadata, exception) ->
- {
- if (exception == null)
- {
- // HANDLE SUCCESS
- log.debug(
- "Sent {}={}, partition={}, offset={}",
- outputRecord.key(),
- outputRecord.value(),
- metadata.partition(),
- metadata.offset());
- }
- else
- {
- // HANDLE ERROR
- log.error(
- "Could not send {}={}: {}",
- outputRecord.key(),
- outputRecord.value(),
- exception.toString());
- }
- });
- }
-
- long delta = clock.millis() - lastCommit;
- if (delta > commitInterval)
- {
- log.info("Elapsed time since last commit: {}ms", delta);
- commitTransaction();
- beginTransaction();
- }
- });
- }
- }
- catch (WakeupException e)
- {
- log.info("Waking up from exception!");
- // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
- // commitTransaction();
- }
- catch (Exception e)
- {
- log.error("Unexpected exception!", e);
- producer.abortTransaction();
- }
- finally
- {
- try
- {
- log.info("Closing consumer");
- consumer.close();
- log.info("Closing producer");
- producer.close();
- log.info("Exiting!");
- }
- finally
- {
- running.unlock();
- }
- }
- }
-
- private void beginTransaction()
- {
- log.info("Beginning new transaction");
- lastCommit = clock.millis();
- producer.beginTransaction();
- }
-
- private void commitTransaction()
- {
- Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
- for (int i = 0; i < offsets.length; i++)
- {
- if (offsets[i] > 0)
- {
- offsetsToSend.put(
- new TopicPartition(inputTopic, i),
- new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
- }
- }
- producer.sendOffsetsToTransaction(
- offsetsToSend,
- consumer.groupMetadata());
- log.info("Committing transaction");
- producer.commitTransaction();
- }
-
- class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
- {
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- log.info("Assigned partitions: {}", toString(partitions));
-
- // Compote the length of an array, that can be used to store the offsets
- // (We can ignore the topic, since we only read from a single one!)
- int length =
- partitions
- .stream()
- .reduce(
- 0,
- (i, v) -> i < v.partition() ? v.partition() : i,
- (l, r) -> l < r ? r : l) + 1;
- offsets = new long[length];
- leaderEpochs = new Optional[length];
-
- beginTransaction();
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- log.info("Revoked partitions: {}", toString(partitions));
- commitTransaction();
- }
-
- @Override
- public void onPartitionsLost(Collection<TopicPartition> partitions)
- {
- log.info("Lost partitions: {}", toString(partitions));
- producer.abortTransaction();
- }
-
- String toString(Collection<TopicPartition> partitions)
- {
- return
- partitions
- .stream()
- .map(tp -> tp.topic() + "-" + tp.partition())
- .collect(Collectors.joining(", "));
- }
- }
-
- @PreDestroy
- public void stop()
- {
- log.info("Shutdown requested...");
- if (stopped)
- {
- log.warn("Ignoring request: already stopped!");
- return;
- }
- stopped = true;
- consumer.wakeup();
- running.lock();
- log.info("Shutdown completed!");
- }
-}