Implementierung vereinfacht: Auf das Nötigste zusammengekürzt
authorKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 17:33:11 +0000 (19:33 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 26 Aug 2022 10:48:31 +0000 (12:48 +0200)
* Das regelmäßige Speichern im Poll-Interval wird für die Übung nicht
  benötigt.
* Damit entfällt auch das Interface
  `PollIntervalAwareConsumerRebalanceListener`
* Die Vereinfachung hat eine Anpassung der Tests erfordert: Da in dem
  Test, der überprüft, ob die Offsets korrekt committed werde, wenn kein
  Fehler vorliegt, gar kein Rebalance auftritt, musste der Consumer
  gestoppt werden, damit die Ergebnisse für die Überprüfung sichtbar
  werden.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java [deleted file]
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index b58295f..c1bc019 100644 (file)
@@ -39,9 +39,7 @@ public class ApplicationConfiguration
         recordHandler,
         adderResults,
         stateRepository,
-        properties.getClientId(),
-        Clock.systemDefaultZone(),
-        properties.getCommitInterval());
+        properties.getClientId());
   }
 
   @Bean
index a89c633..6d3850f 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.time.Clock;
@@ -12,19 +13,15 @@ import java.util.*;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
   private final StateRepository stateRepository;
   private final String id;
-  private final Clock clock;
-  private final Duration commitInterval;
 
   private final Set<Integer> partitions = new HashSet<>();
 
-  private Instant lastCommit = Instant.EPOCH;
-
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
@@ -73,20 +70,4 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
       stateRepository.save(new StateDocument(partition, state, results));
     });
   }
-
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data, last commit: {}", lastCommit);
-      partitions.forEach(partition -> stateRepository.save(
-          new StateDocument(
-              partition,
-              recordHandler.getState(partition).getState(),
-              adderResults.getState(partition))));
-      lastCommit = clock.instant();
-    }
-  }
 }
index 0238521..00678c4 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+  private final ConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -67,8 +67,6 @@ public class EndlessConsumer<K, V> implements Runnable
 
           consumed++;
         }
-
-        rebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java
deleted file mode 100644 (file)
index 8abec12..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-
-
-public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener
-{
-  default void beforeNextPoll() {}
-}
index 54137b4..595ef89 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -66,7 +67,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+       ConsumerRebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;
 
@@ -91,7 +92,7 @@ abstract class GenericApplicationTests<K, V>
        /** Tests methods */
 
        @Test
-       void commitsCurrentOffsetsOnSuccess()
+       void commitsCurrentOffsetsOnSuccess() throws Exception
        {
                int numberOfGeneratedMessages =
                                recordGenerator.generate(false, false, messageSender);
@@ -114,6 +115,7 @@ abstract class GenericApplicationTests<K, V>
                                .isThrownBy(() -> endlessConsumer.exitStatus())
                                .describedAs("Consumer should still be running");
 
+               endlessConsumer.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -385,7 +387,6 @@ abstract class GenericApplicationTests<K, V>
        {
                try
                {
-                       endlessConsumer.stop();
                        testRecordProducer.close();
                        offsetConsumer.close();
                }