--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import javax.annotation.PreDestroy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j
+public class SplitterStreamProcessor implements ApplicationRunner
+{
+ final static Pattern PATTERN = Pattern.compile("\\W+");
+
+ private final String inputTopic;
+ private final String outputTopic;
+ private final KafkaConsumer<String, String> consumer;
+ private final KafkaProducer<String, String> producer;
+ private final Clock clock;
+ private final int commitInterval;
+ private final Lock running = new ReentrantLock();
+
+ private boolean stopped = false;
+ private long[] offsets;
+ private Optional<Integer>[] leaderEpochs;
+ private long lastCommit;
+
+ public SplitterStreamProcessor(
+ SplitterApplicationProperties properties,
+ Clock clock)
+ {
+ this.inputTopic = properties.getInputTopic();
+ this.outputTopic = properties.getOutputTopic();
+
+ this.clock = clock;
+ this.commitInterval = properties.getCommitInterval();
+
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+ Properties props;
+
+ props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumer = new KafkaConsumer<>(props);
+
+ props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+ props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ producer = new KafkaProducer<>(props);
+ }
+
+ @Override
+ public void run(ApplicationArguments args)
+ {
+ running.lock();
+
+ try
+ {
+ log.info("Initializing transaction");
+ producer.initTransactions();
+
+ log.info("Subscribing to topic {}", inputTopic);
+ consumer.subscribe(
+ Arrays.asList(inputTopic),
+ new TransactionalConsumerRebalanceListener());
+
+ while (!stopped)
+ {
+ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+ records.forEach(inputRecord ->
+ {
+ log.debug(
+ "Received a recording of {}, partition={}, offset={}, epoch={}",
+ inputRecord.key(),
+ inputRecord.partition(),
+ inputRecord.offset(),
+ inputRecord.leaderEpoch());
+
+ offsets[inputRecord.partition()] = inputRecord.offset();
+ leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
+ String[] words = PATTERN.split(inputRecord.value());
+ for (int i = 0; i < words.length; i++)
+ {
+ ProducerRecord<String, String> outputRecord =
+ new ProducerRecord<>(
+ outputTopic,
+ inputRecord.key(),
+ words[i].trim());
+
+ producer.send(outputRecord, (metadata, exception) ->
+ {
+ if (exception == null)
+ {
+ // HANDLE SUCCESS
+ log.debug(
+ "Sent {}={}, partition={}, offset={}",
+ outputRecord.key(),
+ outputRecord.value(),
+ metadata.partition(),
+ metadata.offset());
+ }
+ else
+ {
+ // HANDLE ERROR
+ log.error(
+ "Could not send {}={}: {}",
+ outputRecord.key(),
+ outputRecord.value(),
+ exception.toString());
+ }
+ });
+ }
+
+ long delta = clock.millis() - lastCommit;
+ if (delta > commitInterval)
+ {
+ log.info("Elapsed time since last commit: {}ms", delta);
+ commitTransaction();
+ beginTransaction();
+ }
+ });
+ }
+ }
+ catch (WakeupException e)
+ {
+ log.info("Waking up from exception!");
+ // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
+ // commitTransaction();
+ }
+ catch (Exception e)
+ {
+ log.error("Unexpected exception!", e);
+ producer.abortTransaction();
+ }
+ finally
+ {
+ try
+ {
+ log.info("Closing consumer");
+ consumer.close();
+ log.info("Closing producer");
+ producer.close();
+ log.info("Exiting!");
+ }
+ finally
+ {
+ running.unlock();
+ }
+ }
+ }
+
+ private void beginTransaction()
+ {
+ log.info("Beginning new transaction");
+ lastCommit = clock.millis();
+ producer.beginTransaction();
+ }
+
+ private void commitTransaction()
+ {
+ Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+ for (int i = 0; i < offsets.length; i++)
+ {
+ if (offsets[i] > 0)
+ {
+ offsetsToSend.put(
+ new TopicPartition(inputTopic, i),
+ new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+ }
+ }
+ producer.sendOffsetsToTransaction(
+ offsetsToSend,
+ consumer.groupMetadata());
+ log.info("Committing transaction");
+ producer.commitTransaction();
+ }
+
+ class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
+ {
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ log.info("Assigned partitions: {}", toString(partitions));
+
+ // Compote the length of an array, that can be used to store the offsets
+ // (We can ignore the topic, since we only read from a single one!)
+ int length =
+ partitions
+ .stream()
+ .reduce(
+ 0,
+ (i, v) -> i < v.partition() ? v.partition() : i,
+ (l, r) -> l < r ? r : l) + 1;
+ offsets = new long[length];
+ leaderEpochs = new Optional[length];
+
+ beginTransaction();
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", toString(partitions));
+ commitTransaction();
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info("Lost partitions: {}", toString(partitions));
+ producer.abortTransaction();
+ }
+
+ String toString(Collection<TopicPartition> partitions)
+ {
+ return
+ partitions
+ .stream()
+ .map(tp -> tp.topic() + "-" + tp.partition())
+ .collect(Collectors.joining(", "));
+ }
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ log.info("Shutdown requested...");
+ stopped = true;
+ consumer.wakeup();
+ running.lock();
+ log.info("Shutdown completed!");
+ }
+}