From b2d13a3c64d4a7a9a56a45a832f06fc0b46bec67 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 2 Nov 2024 19:23:36 +0100 Subject: [PATCH] WIP --- .../juplo/kafka/ApplicationConfiguration.java | 12 ++++ .../java/de/juplo/kafka/ExampleConsumer.java | 55 +++++++++++++++++-- 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 49875a0..8437bb4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -12,6 +12,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Clock; import java.util.Properties; @@ -24,6 +25,7 @@ public class ApplicationConfiguration Consumer kafkaConsumer, Producer kafkaProducer, ApplicationProperties properties, + Clock clock, ConfigurableApplicationContext applicationContext) { return @@ -33,6 +35,8 @@ public class ApplicationConfiguration kafkaConsumer, properties.getProducerProperties().getTopic(), kafkaProducer, + clock, + properties.getConsumerProperties().getAutoCommitInterval(), () -> applicationContext.close()); } @@ -43,6 +47,7 @@ public class ApplicationConfiguration props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); props.put("group.id", properties.getConsumerProperties().getGroupId()); + props.put("enable.auto.commit", false); if (properties.getConsumerProperties().getAutoOffsetReset() != null) { props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); @@ -65,6 +70,7 @@ public class ApplicationConfiguration Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", properties.getClientId()); + props.put("transactional.id", "my-tx"); props.put("acks", properties.getProducerProperties().getAcks()); props.put("batch.size", properties.getProducerProperties().getBatchSize()); props.put("metadata.maxage.ms", 5000); // 5 Sekunden @@ -77,4 +83,10 @@ public class ApplicationConfiguration return new KafkaProducer<>(props); } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 0fbeead..73f56a8 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,18 +1,18 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.Phaser; +import java.util.stream.Collectors; @Slf4j @@ -27,7 +27,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener private final String stateTopic; private final Producer producer; + private final Clock clock; + private final Duration commitInterval; + private volatile boolean running = false; + private Instant lastCommit; private final Phaser phaser = new Phaser(1); private final Set assignedPartitions = new HashSet<>(); private volatile State[] partitionStates; @@ -46,6 +50,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener Consumer consumer, String stateTopic, Producer producer, + Clock clock, + Duration commitInterval, Runnable closeCallback) { this.id = clientId; @@ -53,6 +59,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener this.consumer = consumer; this.stateTopic = stateTopic; this.producer = producer; + this.clock = clock; + this.commitInterval = commitInterval; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -85,6 +93,12 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.subscribe(Arrays.asList(topic, stateTopic), this); running = true; + log.info("{} - Initializing the transaction", id); + producer.initTransactions(); + + lastCommit = clock.instant(); + producer.beginTransaction(); + while (running) { ConsumerRecords records = @@ -132,6 +146,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener int arrivedPhase = phaser.arriveAndAwaitAdvance(); log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase); + + commitIfNecessary(); } } catch(WakeupException e) @@ -141,9 +157,10 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener catch(Exception e) { log.error("{} - Unexpected error, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); + producer.abortTransaction(); log.info("{} - Triggering exit of application!", id); new Thread(closeCallback).start(); + return; } finally { @@ -151,6 +168,34 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } + + producer.commitTransaction(); + } + + private void commitIfNecessary() + { + Instant now = clock.instant(); + if (!now.isBefore(lastCommit.plus(commitInterval))) + { + producer.sendOffsetsToTransaction(getCurrentOffsets(), consumer.groupMetadata()); + commit(); + lastCommit = now; + } + } + + private Map getCurrentOffsets() + { + return assignedPartitions + .stream() + .collect(Collectors.toMap( + partition -> partition, + partition -> new OffsetAndMetadata(consumer.position(partition)))); + } + + private void commit() + { + producer.commitTransaction(); + producer.beginTransaction(); } private void handleRecord( -- 2.20.1