WIP
authorKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 20:56:27 +0000 (22:56 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 22 Oct 2021 20:56:27 +0000 (22:56 +0200)
src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java

index dd00aca..92a544b 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
@@ -17,8 +18,7 @@ import org.springframework.stereotype.Component;
 import javax.annotation.PreDestroy;
 import java.time.Clock;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -37,6 +37,8 @@ public class SplitterStreamProcessor implements ApplicationRunner
   private final int commitInterval;
 
   private boolean stopped = false;
+  private long[] offsets;
+  private Optional<Integer>[] leaderEpochs;
   private long lastCommit;
 
   public SplitterStreamProcessor(
@@ -74,6 +76,9 @@ public class SplitterStreamProcessor implements ApplicationRunner
 
         records.forEach(inputRecord ->
         {
+          offsets[inputRecord.partition()] = inputRecord.offset();
+          leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
           String[] words = PATTERN.split(inputRecord.value());
           for (int i = 0; i < words.length; i++)
           {
@@ -146,10 +151,19 @@ public class SplitterStreamProcessor implements ApplicationRunner
 
   private void commitTransaction()
   {
-    log.info("Committing transaction");
+    Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+    for (int i = 0; i < offsets.length; i++)
+    {
+      if (offsets[i] > 0)
+      {
+        offsetsToSend.put(
+            new TopicPartition(inputTopic, i),
+            new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+      }
+    }
     producer.sendOffsetsToTransaction(
-        consumer.po,
-        consumer.groupMetadata());
+        offsetsToSend,
+        consumer.groupMetadata());log.info("Committing transaction");
     producer.commitTransaction();
   }
 
@@ -158,40 +172,44 @@ public class SplitterStreamProcessor implements ApplicationRunner
     @Override
     public void onPartitionsAssigned(Collection<TopicPartition> partitions)
     {
-      log.info(
-          "Assigned partitions: {}",
+      log.info("Assigned partitions: {}", toString(partitions));
+
+      // Compote the length of an array, that can be used to store the offsets
+      // (We can ignore the topic, since we only read from a single one!)
+      int length =
           partitions
               .stream()
-              .map(tp -> tp.topic() + "-" + tp.partition())
-              .collect(Collectors.joining(", ")));
-
-      commitTransaction();
+              .reduce(
+                  0,
+                  (i, v) -> i < v.partition() ? v.partition() : i,
+                  (l, r) -> l < r ? r : l) + 1;
+      offsets = new long[length];
+      leaderEpochs = new Optional[length];
+
+      beginTransaction();
     }
 
     @Override
     public void onPartitionsRevoked(Collection<TopicPartition> partitions)
     {
-      log.info(
-          "Revoked partitions: {}",
-          partitions
-              .stream()
-              .map(tp -> tp.topic() + "-" + tp.partition())
-              .collect(Collectors.joining(", ")));
-
+      log.info("Revoked partitions: {}", toString(partitions));
       commitTransaction();
     }
 
     @Override
     public void onPartitionsLost(Collection<TopicPartition> partitions)
     {
-      log.info(
-          "Lost partitions: {}",
+      log.info("Lost partitions: {}", toString(partitions));
+      producer.abortTransaction();
+    }
+
+    String toString(Collection<TopicPartition> partitions)
+    {
+      return
           partitions
               .stream()
               .map(tp -> tp.topic() + "-" + tp.partition())
-              .collect(Collectors.joining(", ")));
-
-      producer.abortTransaction();
+              .collect(Collectors.joining(", "));
     }
   }