From d675a67e01107b52240abbe62820aa1c8519f88d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 18:39:05 +0200 Subject: [PATCH] Das Speichern der Daten und Offsets erfolgt nicht mehr nach jedem `poll()` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Statdessen kann eine `Duration` konfiguriert werden. * Ähnlich wie in der Client-Library von Kafka, wird ein Zeitstempel für den letzten Commit gespeichert und die Daten werden immer dann gespeichert, wenn dieser weiter als das eingestellte `consumer.commit-interval` in der Vergangenheit liegt. --- .../juplo/kafka/ApplicationConfiguration.java | 3 +++ .../de/juplo/kafka/ApplicationProperties.java | 3 +++ .../java/de/juplo/kafka/EndlessConsumer.java | 21 ++++++++++++++----- src/main/resources/application.yml | 1 + .../java/de/juplo/kafka/ApplicationTests.java | 4 ++++ 5 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 54e9b89..7a24c97 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -8,6 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,6 +42,8 @@ public class ApplicationConfiguration repository, properties.getClientId(), properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), kafkaConsumer, handler); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fa731c5..14e928f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import java.time.Duration; @ConfigurationProperties(prefix = "consumer") @@ -30,4 +31,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String autoOffsetReset; + @NotNull + private Duration commitInterval; } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index a93ae2c..ce5dd72 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -8,7 +8,9 @@ import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -25,6 +27,8 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private final PartitionStatisticsRepository repository; private final String id; private final String topic; + private final Clock clock; + private final Duration commitInterval; private final Consumer consumer; private final java.util.function.Consumer> handler; @@ -94,6 +98,8 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic), this); + Instant lastCommit = clock.instant(); + while (true) { ConsumerRecords records = @@ -129,11 +135,16 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl byKey.put(key, seenByKey); } - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + seen.forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } } } catch(WakeupException e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 93b27c2..24f434f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ consumer: client-id: DEV topic: test auto-offset-reset: earliest + commit-interval: 30s management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 4b7ef36..431431b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -21,6 +21,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @@ -43,6 +44,7 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -268,6 +270,8 @@ class ApplicationTests repository, properties.getClientId(), properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), kafkaConsumer, captureOffsetAndExecuteTestHandler); -- 2.20.1