]> juplo.de Git - demos/kafka/training/commitdiff
Implementierung vereinfacht: Auf das Nötigste zusammengekürzt
authorKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 17:33:11 +0000 (19:33 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 10:48:31 +0000 (12:48 +0200)
* Das regelmäßige Speichern im Poll-Interval wird für die Übung nicht
  benötigt.
* Damit entfällt auch das Interface
  `PollIntervalAwareConsumerRebalanceListener`
* Die Vereinfachung hat eine Anpassung der Tests erfordert: Da in dem
  Test, der überprüft, ob die Offsets korrekt committed werde, wenn kein
  Fehler vorliegt, gar kein Rebalance auftritt, musste der Consumer
  gestoppt werden, damit die Ergebnisse für die Überprüfung sichtbar
  werden.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java [deleted file]
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index b58295f29d5a68e586955a73f7a95efaed195fa7..c1bc0192e401788d1b60886095c6546c12d450e3 100644 (file)
@@ -39,9 +39,7 @@ public class ApplicationConfiguration
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId(),
-        Clock.systemDefaultZone(),
-        properties.getCommitInterval());
+        properties.getClientId());
   }
 
   @Bean
index a89c633c760c55e9ab36708b90b483cb2d96e718..6d3850f677b33e8ad71695527ac9386e3e42c1fd 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.time.Clock;
@@ -12,19 +13,15 @@ import java.util.*;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
-  private final Clock clock;
-  private final Duration commitInterval;
 
   private final Set<Integer> partitions = new HashSet<>();
 
-  private Instant lastCommit = Instant.EPOCH;
-
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
@@ -73,20 +70,4 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
       stateRepository.save(new StateDocument(partition, state, results));
     });
   }
-
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data, last commit: {}", lastCommit);
-      partitions.forEach(partition -> stateRepository.save(
-          new StateDocument(
-              partition,
-              recordHandler.getState(partition).getState(),
-              adderResults.getState(partition))));
-      lastCommit = clock.instant();
-    }
-  }
 }
index 02385211cdf8470df6973f6bb608095dcebe501f..00678c45cf344dd43e5e4d5b5bdcbea74bd1a75d 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+  private final ConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -67,8 +67,6 @@ public class EndlessConsumer<K, V> implements Runnable
 
           consumed++;
         }
-
-        rebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java
deleted file mode 100644 (file)
index 8abec12..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-
-
-public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
-{
-  default void beforeNextPoll() {}
-}
index 54137b44d5a3a66f9557a7e4d3123b3e9e061d0b..595ef89d29001fbc39a34cf1d757e8ecadc7d69f 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -66,7 +67,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+       ConsumerRebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;
 
@@ -91,7 +92,7 @@ abstract class GenericApplicationTests<K, V>
        /** Tests methods */
 
        @Test
-       void commitsCurrentOffsetsOnSuccess()
+       void commitsCurrentOffsetsOnSuccess() throws Exception
        {
                int numberOfGeneratedMessages =
                                recordGenerator.generate(false, false, messageSender);
@@ -114,6 +115,7 @@ abstract class GenericApplicationTests<K, V>
                                .isThrownBy(() -> endlessConsumer.exitStatus())
                                .describedAs("Consumer should still be running");
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -385,7 +387,6 @@ abstract class GenericApplicationTests<K, V>
        {
                try
                {
-                       endlessConsumer.stop();
                        testRecordProducer.close();
                        offsetConsumer.close();
                }