From: Kai Moritz <kai@juplo.de>
Date: Sun, 24 Jul 2022 16:39:05 +0000 (+0200)
Subject: Das Speichern der Daten und Offsets erfolgt nicht mehr nach jedem `poll()`
X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1^2~6
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d675a67e01107b52240abbe62820aa1c8519f88d;p=demos%2Fkafka%2Ftraining

Das Speichern der Daten und Offsets erfolgt nicht mehr nach jedem `poll()`

* Statdessen kann eine `Duration` konfiguriert werden.
* Ähnlich wie in der Client-Library von Kafka, wird ein Zeitstempel für
  den letzten Commit gespeichert und die Daten werden immer dann
  gespeichert, wenn dieser weiter als das eingestellte
  `consumer.commit-interval` in der Vergangenheit liegt.
---

diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index 54e9b893..7a24c975 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -8,6 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.time.Clock;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,6 +42,8 @@ public class ApplicationConfiguration
             repository,
             properties.getClientId(),
             properties.getTopic(),
+            Clock.systemDefaultZone(),
+            properties.getCommitInterval(),
             kafkaConsumer,
             handler);
   }
diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java
index fa731c53..14e928f1 100644
--- a/src/main/java/de/juplo/kafka/ApplicationProperties.java
+++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java
@@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated;
 
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
+import java.time.Duration;
 
 
 @ConfigurationProperties(prefix = "consumer")
@@ -30,4 +31,6 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String autoOffsetReset;
+  @NotNull
+  private Duration commitInterval;
 }
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index a93ae2cf..ce5dd723 100644
--- a/src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java
@@ -8,7 +8,9 @@ import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import javax.annotation.PreDestroy;
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -25,6 +27,8 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private final PartitionStatisticsRepository repository;
   private final String id;
   private final String topic;
+  private final Clock clock;
+  private final Duration commitInterval;
   private final Consumer<K, V> consumer;
   private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
 
@@ -94,6 +98,8 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic), this);
 
+      Instant lastCommit = clock.instant();
+
       while (true)
       {
         ConsumerRecords<K, V> records =
@@ -129,11 +135,16 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           byKey.put(key, seenByKey);
         }
 
-        seen.forEach((partiton, statistics) -> repository.save(
-            new StatisticsDocument(
-                partiton,
-                statistics,
-                consumer.position(new TopicPartition(topic, partiton)))));
+        if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+        {
+          log.debug("Storing data and offsets, last commit: {}", lastCommit);
+          seen.forEach((partiton, statistics) -> repository.save(
+              new StatisticsDocument(
+                  partiton,
+                  statistics,
+                  consumer.position(new TopicPartition(topic, partiton)))));
+          lastCommit = clock.instant();
+        }
       }
     }
     catch(WakeupException e)
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 93b27c20..24f434ff 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -4,6 +4,7 @@ consumer:
   client-id: DEV
   topic: test
   auto-offset-reset: earliest
+  commit-interval: 30s
 management:
   endpoint:
     shutdown:
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
index 4b7ef36f..431431bb 100644
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/ApplicationTests.java
@@ -21,6 +21,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
@@ -43,6 +44,7 @@ import static org.awaitility.Awaitility.*;
 		properties = {
 				"consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
 				"consumer.topic=" + TOPIC,
+				"consumer.commit-interval=1s",
 				"spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @EnableAutoConfiguration
@@ -268,6 +270,8 @@ class ApplicationTests
 						repository,
 						properties.getClientId(),
 						properties.getTopic(),
+						Clock.systemDefaultZone(),
+						properties.getCommitInterval(),
 						kafkaConsumer,
 						captureOffsetAndExecuteTestHandler);