import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import javax.annotation.PreDestroy;
import java.time.Clock;
import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collection;
+import java.util.*;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
private final int commitInterval;
private boolean stopped = false;
+ private long[] offsets;
+ private Optional<Integer>[] leaderEpochs;
private long lastCommit;
public SplitterStreamProcessor(
records.forEach(inputRecord ->
{
+ offsets[inputRecord.partition()] = inputRecord.offset();
+ leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
+
String[] words = PATTERN.split(inputRecord.value());
for (int i = 0; i < words.length; i++)
{
private void commitTransaction()
{
- log.info("Committing transaction");
+ Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
+ for (int i = 0; i < offsets.length; i++)
+ {
+ if (offsets[i] > 0)
+ {
+ offsetsToSend.put(
+ new TopicPartition(inputTopic, i),
+ new OffsetAndMetadata(offsets[i], leaderEpochs[i], ""));
+ }
+ }
producer.sendOffsetsToTransaction(
- consumer.po,
- consumer.groupMetadata());
+ offsetsToSend,
+ consumer.groupMetadata());log.info("Committing transaction");
producer.commitTransaction();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- log.info(
- "Assigned partitions: {}",
+ log.info("Assigned partitions: {}", toString(partitions));
+
+ // Compote the length of an array, that can be used to store the offsets
+ // (We can ignore the topic, since we only read from a single one!)
+ int length =
partitions
.stream()
- .map(tp -> tp.topic() + "-" + tp.partition())
- .collect(Collectors.joining(", ")));
-
- commitTransaction();
+ .reduce(
+ 0,
+ (i, v) -> i < v.partition() ? v.partition() : i,
+ (l, r) -> l < r ? r : l) + 1;
+ offsets = new long[length];
+ leaderEpochs = new Optional[length];
+
+ beginTransaction();
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- log.info(
- "Revoked partitions: {}",
- partitions
- .stream()
- .map(tp -> tp.topic() + "-" + tp.partition())
- .collect(Collectors.joining(", ")));
-
+ log.info("Revoked partitions: {}", toString(partitions));
commitTransaction();
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions)
{
- log.info(
- "Lost partitions: {}",
+ log.info("Lost partitions: {}", toString(partitions));
+ producer.abortTransaction();
+ }
+
+ String toString(Collection<TopicPartition> partitions)
+ {
+ return
partitions
.stream()
.map(tp -> tp.topic() + "-" + tp.partition())
- .collect(Collectors.joining(", ")));
-
- producer.abortTransaction();
+ .collect(Collectors.joining(", "));
}
}