From 2bd789b289084473a7210dad3da02dfd1fdaf084 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Oct 2021 22:56:27 +0200 Subject: [PATCH] WIP --- .../recorder/SplitterStreamProcessor.java | 64 ++++++++++++------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java index dd00aca..92a544b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; @@ -17,8 +18,7 @@ import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.time.Clock; import java.time.Duration; -import java.util.Arrays; -import java.util.Collection; +import java.util.*; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -37,6 +37,8 @@ public class SplitterStreamProcessor implements ApplicationRunner private final int commitInterval; private boolean stopped = false; + private long[] offsets; + private Optional[] leaderEpochs; private long lastCommit; public SplitterStreamProcessor( @@ -74,6 +76,9 @@ public class SplitterStreamProcessor implements ApplicationRunner records.forEach(inputRecord -> { + offsets[inputRecord.partition()] = inputRecord.offset(); + leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); + String[] words = PATTERN.split(inputRecord.value()); for (int i = 0; i < words.length; i++) { @@ -146,10 +151,19 @@ public class SplitterStreamProcessor implements ApplicationRunner private void commitTransaction() { - log.info("Committing transaction"); + Map offsetsToSend = new HashMap<>(); + for (int i = 0; i < offsets.length; i++) + { + if (offsets[i] > 0) + { + offsetsToSend.put( + new TopicPartition(inputTopic, i), + new OffsetAndMetadata(offsets[i], leaderEpochs[i], "")); + } + } producer.sendOffsetsToTransaction( - consumer.po, - consumer.groupMetadata()); + offsetsToSend, + consumer.groupMetadata());log.info("Committing transaction"); producer.commitTransaction(); } @@ -158,40 +172,44 @@ public class SplitterStreamProcessor implements ApplicationRunner @Override public void onPartitionsAssigned(Collection partitions) { - log.info( - "Assigned partitions: {}", + log.info("Assigned partitions: {}", toString(partitions)); + + // Compote the length of an array, that can be used to store the offsets + // (We can ignore the topic, since we only read from a single one!) + int length = partitions .stream() - .map(tp -> tp.topic() + "-" + tp.partition()) - .collect(Collectors.joining(", "))); - - commitTransaction(); + .reduce( + 0, + (i, v) -> i < v.partition() ? v.partition() : i, + (l, r) -> l < r ? r : l) + 1; + offsets = new long[length]; + leaderEpochs = new Optional[length]; + + beginTransaction(); } @Override public void onPartitionsRevoked(Collection partitions) { - log.info( - "Revoked partitions: {}", - partitions - .stream() - .map(tp -> tp.topic() + "-" + tp.partition()) - .collect(Collectors.joining(", "))); - + log.info("Revoked partitions: {}", toString(partitions)); commitTransaction(); } @Override public void onPartitionsLost(Collection partitions) { - log.info( - "Lost partitions: {}", + log.info("Lost partitions: {}", toString(partitions)); + producer.abortTransaction(); + } + + String toString(Collection partitions) + { + return partitions .stream() .map(tp -> tp.topic() + "-" + tp.partition()) - .collect(Collectors.joining(", "))); - - producer.abortTransaction(); + .collect(Collectors.joining(", ")); } } -- 2.20.1