From 2d25525ef70a90709edc48bd9542d1b08a2888a2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 21 Aug 2022 19:33:11 +0200 Subject: [PATCH] =?utf8?q?Implementierung=20vereinfacht:=20Auf=20das=20N?= =?utf8?q?=C3=B6tigste=20zusammengek=C3=BCrzt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Das regelmäßige Speichern im Poll-Interval wird für die Übung nicht benötigt. * Damit entfällt auch das Interface `PollIntervalAwareConsumerRebalanceListener` * Die Vereinfachung hat eine Anpassung der Tests erfordert: Da in dem Test, der überprüft, ob die Offsets korrekt committed werde, wenn kein Fehler vorliegt, gar kein Rebalance auftritt, musste der Consumer gestoppt werden, damit die Ergebnisse für die Überprüfung sichtbar werden. --- .../juplo/kafka/ApplicationConfiguration.java | 4 +--- .../kafka/ApplicationRebalanceListener.java | 23 ++----------------- .../java/de/juplo/kafka/EndlessConsumer.java | 4 +--- ...ntervalAwareConsumerRebalanceListener.java | 9 -------- .../juplo/kafka/GenericApplicationTests.java | 7 +++--- 5 files changed, 8 insertions(+), 39 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b58295f..c1bc019 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -39,9 +39,7 @@ public class ApplicationConfiguration recordHandler, adderResults, stateRepository, - properties.getClientId(), - Clock.systemDefaultZone(), - properties.getCommitInterval()); + properties.getClientId()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index a89c633..6d3850f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.time.Clock; @@ -12,19 +13,15 @@ import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener +public class ApplicationRebalanceListener implements ConsumerRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; - private final Clock clock; - private final Duration commitInterval; private final Set partitions = new HashSet<>(); - private Instant lastCommit = Instant.EPOCH; - @Override public void onPartitionsAssigned(Collection partitions) { @@ -73,20 +70,4 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe stateRepository.save(new StateDocument(partition, state, results)); }); } - - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data, last commit: {}", lastCommit); - partitions.forEach(partition -> stateRepository.save( - new StateDocument( - partition, - recordHandler.getState(partition).getState(), - adderResults.getState(partition)))); - lastCommit = clock.instant(); - } - } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0238521..00678c4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final PollIntervalAwareConsumerRebalanceListener rebalanceListener; + private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; private final Lock lock = new ReentrantLock(); @@ -67,8 +67,6 @@ public class EndlessConsumer implements Runnable consumed++; } - - rebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java deleted file mode 100644 index 8abec12..0000000 --- a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; - - -public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener -{ - default void beforeNextPoll() {} -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 54137b4..595ef89 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -66,7 +67,7 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired - PollIntervalAwareConsumerRebalanceListener rebalanceListener; + ConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -91,7 +92,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -114,6 +115,7 @@ abstract class GenericApplicationTests .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -385,7 +387,6 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); testRecordProducer.close(); offsetConsumer.close(); } -- 2.20.1