splitter: 1.0.0-spring-integration - Inital implementation (incomplete)
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / splitter / SplitterStreamProcessor.java
diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java
deleted file mode 100644 (file)
index 0eafbda..0000000
+++ /dev/null
@@ -1,267 +0,0 @@
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.*;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.core.task.TaskExecutor;
-import org.springframework.stereotype.Component;
-import org.springframework.util.Assert;
-
-import javax.annotation.PreDestroy;
-import java.time.Clock;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-
-@Component
-@Slf4j
-public class SplitterStreamProcessor implements Runnable
-{
-  private final MessageSplitter splitter;
-  private final String inputTopic;
-  private final String outputTopic;
-  private final KafkaConsumer<String, String> consumer;
-  private final KafkaProducer<String, String> producer;
-  private final Clock clock;
-  private final int commitInterval;
-  private final Lock running = new ReentrantLock();
-
-  private boolean stopped = false;
-  private long[] offsets;
-  private Optional<Integer>[] leaderEpochs;
-  private long lastCommit;
-
-  public SplitterStreamProcessor(
-      MessageSplitter splitter,
-      SplitterApplicationProperties properties,
-      Clock clock,
-      TaskExecutor executor)
-  {
-    this.splitter = splitter;
-
-    this.inputTopic = properties.getInputTopic();
-    this.outputTopic = properties.getOutputTopic();
-
-    this.clock = clock;
-    this.commitInterval = properties.getCommitInterval();
-
-    Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
-
-    Properties props;
-
-    props = new Properties();
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
-    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    consumer = new KafkaConsumer<>(props);
-
-    props = new Properties();
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID");
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-    producer = new KafkaProducer<>(props);
-
-    executor.execute(this);
-  }
-
-  public void run()
-  {
-    running.lock();
-
-    try
-    {
-      log.info("Initializing transaction");
-      producer.initTransactions();
-
-      log.info("Subscribing to topic {}", inputTopic);
-      consumer.subscribe(
-          Arrays.asList(inputTopic),
-          new TransactionalConsumerRebalanceListener());
-
-      while (!stopped)
-      {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
-
-        records.forEach(inputRecord ->
-        {
-          log.debug(
-              "Received a recording of {}, partition={}, offset={}, epoch={}",
-              inputRecord.key(),
-              inputRecord.partition(),
-              inputRecord.offset(),
-              inputRecord.leaderEpoch());
-
-          offsets[inputRecord.partition()] = inputRecord.offset();
-          leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
-
-          String[] words = splitter.split(inputRecord.value());
-          for (int i = 0; i < words.length; i++)
-          {
-            ProducerRecord<String, String> outputRecord =
-                new ProducerRecord<>(
-                    outputTopic,
-                    inputRecord.key(),
-                    words[i].trim());
-
-            producer.send(outputRecord, (metadata, exception) ->
-            {
-              if (exception == null)
-              {
-                // HANDLE SUCCESS
-                log.debug(
-                    "Sent {}={}, partition={}, offset={}",
-                    outputRecord.key(),
-                    outputRecord.value(),
-                    metadata.partition(),
-                    metadata.offset());
-              }
-              else
-              {
-                // HANDLE ERROR
-                log.error(
-                    "Could not send {}={}: {}",
-                    outputRecord.key(),
-                    outputRecord.value(),
-                    exception.toString());
-              }
-            });
-          }
-
-          long delta = clock.millis() - lastCommit;
-          if (delta > commitInterval)
-          {
-            log.info("Elapsed time since last commit: {}ms", delta);
-            commitTransaction();
-            beginTransaction();
-          }
-        });
-      }
-    }
-    catch (WakeupException e)
-    {
-      log.info("Waking up from exception!");
-      // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
-      // commitTransaction();
-    }
-    catch (Exception e)
-    {
-      log.error("Unexpected exception!", e);
-      producer.abortTransaction();
-    }
-    finally
-    {
-      try
-      {
-        log.info("Closing consumer");
-        consumer.close();
-        log.info("Closing producer");
-        producer.close();
-        log.info("Exiting!");
-      }
-      finally
-      {
-        running.unlock();
-      }
-    }
-  }
-
-  private void beginTransaction()
-  {
-    log.info("Beginning new transaction");
-    lastCommit = clock.millis();
-    producer.beginTransaction();
-  }
-
-  private void commitTransaction()
-  {
-    Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
-    for (int i = 0; i < offsets.length; i++)
-    {
-      if (offsets[i] > 0)
-      {
-        offsetsToSend.put(
-            new TopicPartition(inputTopic, i),
-            new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
-      }
-    }
-    producer.sendOffsetsToTransaction(
-        offsetsToSend,
-        consumer.groupMetadata());
-    log.info("Committing transaction");
-    producer.commitTransaction();
-  }
-
-  class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener
-  {
-    @Override
-    public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-    {
-      log.info("Assigned partitions: {}", toString(partitions));
-
-      // Compote the length of an array, that can be used to store the offsets
-      // (We can ignore the topic, since we only read from a single one!)
-      int length =
-          partitions
-              .stream()
-              .reduce(
-                  0,
-                  (i, v) -> i < v.partition() ? v.partition() : i,
-                  (l, r) -> l < r ? r : l) + 1;
-      offsets = new long[length];
-      leaderEpochs = new Optional[length];
-
-      beginTransaction();
-    }
-
-    @Override
-    public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-    {
-      log.info("Revoked partitions: {}", toString(partitions));
-      commitTransaction();
-    }
-
-    @Override
-    public void onPartitionsLost(Collection<TopicPartition> partitions)
-    {
-      log.info("Lost partitions: {}", toString(partitions));
-      producer.abortTransaction();
-    }
-
-    String toString(Collection<TopicPartition> partitions)
-    {
-      return
-          partitions
-              .stream()
-              .map(tp -> tp.topic() + "-" + tp.partition())
-              .collect(Collectors.joining(", "));
-    }
-  }
-
-  @PreDestroy
-  public void stop()
-  {
-    log.info("Shutdown requested...");
-    if (stopped)
-    {
-      log.warn("Ignoring request: already stopped!");
-      return;
-    }
-    stopped = true;
-    consumer.wakeup();
-    running.lock();
-    log.info("Shutdown completed!");
-  }
-}