Das Speichern der Daten und Offsets erfolgt nicht mehr nach jedem `poll()`
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 16:39:05 +0000 (18:39 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 14:44:56 +0000 (16:44 +0200)
* Statdessen kann eine `Duration` konfiguriert werden.
* Ähnlich wie in der Client-Library von Kafka, wird ein Zeitstempel für
  den letzten Commit gespeichert und die Daten werden immer dann
  gespeichert, wenn dieser weiter als das eingestellte
  `consumer.commit-interval` in der Vergangenheit liegt.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 54e9b89..7a24c97 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import java.time.Clock;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -41,6 +42,8 @@ public class ApplicationConfiguration
             repository,
             properties.getClientId(),
             properties.getTopic(),
+            Clock.systemDefaultZone(),
+            properties.getCommitInterval(),
             kafkaConsumer,
             handler);
   }
index fa731c5..14e928f 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated;
 
 import javax.validation.constraints.NotEmpty;
 import javax.validation.constraints.NotNull;
+import java.time.Duration;
 
 
 @ConfigurationProperties(prefix = "consumer")
@@ -30,4 +31,6 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String autoOffsetReset;
+  @NotNull
+  private Duration commitInterval;
 }
index a93ae2c..ce5dd72 100644 (file)
@@ -8,7 +8,9 @@ import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import javax.annotation.PreDestroy;
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -25,6 +27,8 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
   private final PartitionStatisticsRepository repository;
   private final String id;
   private final String topic;
+  private final Clock clock;
+  private final Duration commitInterval;
   private final Consumer<K, V> consumer;
   private final java.util.function.Consumer<ConsumerRecord<K, V>> handler;
 
@@ -94,6 +98,8 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic), this);
 
+      Instant lastCommit = clock.instant();
+
       while (true)
       {
         ConsumerRecords<K, V> records =
@@ -129,11 +135,16 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           byKey.put(key, seenByKey);
         }
 
-        seen.forEach((partiton, statistics) -> repository.save(
-            new StatisticsDocument(
-                partiton,
-                statistics,
-                consumer.position(new TopicPartition(topic, partiton)))));
+        if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+        {
+          log.debug("Storing data and offsets, last commit: {}", lastCommit);
+          seen.forEach((partiton, statistics) -> repository.save(
+              new StatisticsDocument(
+                  partiton,
+                  statistics,
+                  consumer.position(new TopicPartition(topic, partiton)))));
+          lastCommit = clock.instant();
+        }
       }
     }
     catch(WakeupException e)
index 93b27c2..24f434f 100644 (file)
@@ -4,6 +4,7 @@ consumer:
   client-id: DEV
   topic: test
   auto-offset-reset: earliest
+  commit-interval: 30s
 management:
   endpoint:
     shutdown:
index 4b7ef36..431431b 100644 (file)
@@ -21,6 +21,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.context.TestPropertySource;
 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
@@ -43,6 +44,7 @@ import static org.awaitility.Awaitility.*;
                properties = {
                                "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "consumer.topic=" + TOPIC,
+                               "consumer.commit-interval=1s",
                                "spring.mongodb.embedded.version=4.4.13" })
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @EnableAutoConfiguration
@@ -268,6 +270,8 @@ class ApplicationTests
                                                repository,
                                                properties.getClientId(),
                                                properties.getTopic(),
+                                               Clock.systemDefaultZone(),
+                                               properties.getCommitInterval(),
                                                kafkaConsumer,
                                                captureOffsetAndExecuteTestHandler);