splitter: 1.0.0-vanilla-kafka - splits up the recorded sentences into words
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
new file mode 100644 (file)
index 0000000..791e164
--- /dev/null
@@ -0,0 +1,260 @@
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.stereotype.Component;
+import org.springframework.util.Assert;
+
+import javax.annotation.PreDestroy;
+import java.time.Clock;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+
+@Component
+@Slf4j
+public class SplitterStreamProcessor implements ApplicationRunner
+{
+  final static Pattern PATTERN = Pattern.compile("\\W+");
+
+  private final String inputTopic;
+  private final String outputTopic;
+  private final KafkaConsumer<String, String> consumer;
+  private final KafkaProducer<String, String> producer;
+  private final Clock clock;
+  private final int commitInterval;
+  private final Lock running = new ReentrantLock();
+
+  private boolean stopped = false;
+  private long[] offsets;
+  private Optional<Integer>[] leaderEpochs;
+  private long lastCommit;
+
+  public SplitterStreamProcessor(
+      SplitterApplicationProperties properties,
+      Clock clock)
+  {
+    this.inputTopic = properties.getInputTopic();
+    this.outputTopic = properties.getOutputTopic();
+
+    this.clock = clock;
+    this.commitInterval = properties.getCommitInterval();
+
+    Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
+
+    Properties props;
+
+    props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    consumer = new KafkaConsumer<>(props);
+
+    props = new Properties();
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+    producer = new KafkaProducer<>(props);
+  }
+
+  @Override
+  public void run(ApplicationArguments args)
+  {
+    running.lock();
+
+    try
+    {
+      log.info("Initializing transaction");
+      producer.initTransactions();
+
+      log.info("Subscribing to topic {}", inputTopic);
+      consumer.subscribe(
+          Arrays.asList(inputTopic),
+          new TransactionalConsumerRebalanceListener());
+
+      while (!stopped)
+      {
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
+
+        records.forEach(inputRecord ->
+        {
+          log.debug(
+              "Received a recording of {}, partition={}, offset={}, epoch={}",
+              inputRecord.key(),
+              inputRecord.partition(),
+              inputRecord.offset(),
+              inputRecord.leaderEpoch());
+
+          offsets[inputRecord.partition()] = inputRecord.offset();
+          leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
+          String[] words = PATTERN.split(inputRecord.value());
+          for (int i = 0; i < words.length; i++)
+          {
+            ProducerRecord<String, String> outputRecord =
+                new ProducerRecord<>(
+                    outputTopic,
+                    inputRecord.key(),
+                    words[i].trim());
+
+            producer.send(outputRecord, (metadata, exception) ->
+            {
+              if (exception == null)
+              {
+                // HANDLE SUCCESS
+                log.debug(
+                    "Sent {}={}, partition={}, offset={}",
+                    outputRecord.key(),
+                    outputRecord.value(),
+                    metadata.partition(),
+                    metadata.offset());
+              }
+              else
+              {
+                // HANDLE ERROR
+                log.error(
+                    "Could not send {}={}: {}",
+                    outputRecord.key(),
+                    outputRecord.value(),
+                    exception.toString());
+              }
+            });
+          }
+
+          long delta = clock.millis() - lastCommit;
+          if (delta > commitInterval)
+          {
+            log.info("Elapsed time since last commit: {}ms", delta);
+            commitTransaction();
+            beginTransaction();
+          }
+        });
+      }
+    }
+    catch (WakeupException e)
+    {
+      log.info("Waking up from exception!");
+      // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
+      // commitTransaction();
+    }
+    catch (Exception e)
+    {
+      log.error("Unexpected exception!", e);
+      producer.abortTransaction();
+    }
+    finally
+    {
+      try
+      {
+        log.info("Closing consumer");
+        consumer.close();
+        log.info("Closing producer");
+        producer.close();
+        log.info("Exiting!");
+      }
+      finally
+      {
+        running.unlock();
+      }
+    }
+  }
+
+  private void beginTransaction()
+  {
+    log.info("Beginning new transaction");
+    lastCommit = clock.millis();
+    producer.beginTransaction();
+  }
+
+  private void commitTransaction()
+  {
+    Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+    for (int i = 0; i < offsets.length; i++)
+    {
+      if (offsets[i] > 0)
+      {
+        offsetsToSend.put(
+            new TopicPartition(inputTopic, i),
+            new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+      }
+    }
+    producer.sendOffsetsToTransaction(
+        offsetsToSend,
+        consumer.groupMetadata());
+    log.info("Committing transaction");
+    producer.commitTransaction();
+  }
+
+  class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
+  {
+    @Override
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+    {
+      log.info("Assigned partitions: {}", toString(partitions));
+
+      // Compote the length of an array, that can be used to store the offsets
+      // (We can ignore the topic, since we only read from a single one!)
+      int length =
+          partitions
+              .stream()
+              .reduce(
+                  0,
+                  (i, v) -> i < v.partition() ? v.partition() : i,
+                  (l, r) -> l < r ? r : l) + 1;
+      offsets = new long[length];
+      leaderEpochs = new Optional[length];
+
+      beginTransaction();
+    }
+
+    @Override
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+    {
+      log.info("Revoked partitions: {}", toString(partitions));
+      commitTransaction();
+    }
+
+    @Override
+    public void onPartitionsLost(Collection<TopicPartition> partitions)
+    {
+      log.info("Lost partitions: {}", toString(partitions));
+      producer.abortTransaction();
+    }
+
+    String toString(Collection<TopicPartition> partitions)
+    {
+      return
+          partitions
+              .stream()
+              .map(tp -> tp.topic() + "-" + tp.partition())
+              .collect(Collectors.joining(", "));
+    }
+  }
+
+  @PreDestroy
+  public void stop()
+  {
+    log.info("Shutdown requested...");
+    stopped = true;
+    consumer.wakeup();
+    running.lock();
+    log.info("Shutdown completed!");
+  }
+}