From de4f94c45fd0678777fecba4dbcb63f89e0ebafa Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 21:45:13 +0200 Subject: [PATCH] =?utf8?q?ROT:=20R=C3=BCckbau=20auf=20automatische=20Commi?= =?utf8?q?ts=20-=20Testf=C3=A4lle=20laufen=20nicht=20mehr?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Rückbau von sumup-adder auf automatische Commits, so wie in dem Branch stored-state - D.h., nur noch der Zustand wird in der Mongo-DB gespeichert. * Durch den Umbau schlägt `ApplicationTests` fehl, obwohl sich eigentlich nichts an der Logik geändert hat. * Dies ist so "gewollt": Es zeigt, dass bei automatischen Commits im Fehlerfall der gespeicherte Zustand und der Stand der verarbeiteten Nachrichten auseinander laufen. * _Unschön:_ Die Tets sind nicht mehr unabhängig voneinander. ** Eigentlich war erwartet, dass der Test, der den Fehler erzeugt beim 2. Anlauf fehlschlägt, weil durch die doppelt gelesenen Nachrichten weitere Fehler auftreten - diese unterscheiden sich aus der Sicht des Test-Codes aber gar nicht von den vorherigen Fehlern. ** Als _ungewollter_ Seiteneffekt bleibt aber der Zustand in der Mongo-DB zurück, der zwischen den Tests nicht zurückgesetzt wird. ** Dadurch scheitert dann der folgende Test, der eigentlich durchlaufen sollte. ** Genauer: Ob und/oder Welche Tests fehlschlagen, hängt von der Ausführungs-Reihenfolge ab! * *Idee:* `AdderBusinessLogic` weniger motzig implementieren, indem anstatt von getrennten START- und STOP-Nachrichten nur noch eine CALC-Nachricht verwendet wird, die die Summe der zuvor aufgelaufenen Zahlen ausgibt. ** Passt besser zu der ursprünglichen Idee, dass an den falchen Summen leicht gezeigt werden kann, dass Nachrichten doppelt verarbeitet wurden ** Die Idee mit den ungültigen Zuständen führt davon ab! Bei doppelt verarbeiteten Nachrichten ist dann nur noch der invalide Zustand sichtbar, zu den mit der Gauß-Summenformel leicht als falsch zu entlarvenden Summen kommt es dann gar nicht mehr... --- .../juplo/kafka/ApplicationConfiguration.java | 7 +-- .../kafka/ApplicationRebalanceListener.java | 58 +++++-------------- .../java/de/juplo/kafka/EndlessConsumer.java | 6 +- ...ntervalAwareConsumerRebalanceListener.java | 3 - .../java/de/juplo/kafka/StateDocument.java | 5 +- .../java/de/juplo/kafka/ApplicationIT.java | 2 +- .../juplo/kafka/GenericApplicationTests.java | 22 +++---- 7 files changed, 26 insertions(+), 77 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9f54083..4473c69 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -27,17 +26,14 @@ public class ApplicationConfiguration public ApplicationRebalanceListener rebalanceListener( ApplicationRecordHandler recordHandler, StateRepository stateRepository, - Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, stateRepository, properties.getClientId(), - properties.getTopic(), Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); + properties.getCommitInterval()); } @Bean @@ -73,7 +69,6 @@ public class ApplicationConfiguration props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); - props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 542af2d..7256732 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import java.time.Clock; @@ -19,13 +18,10 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe private final ApplicationRecordHandler recordHandler; private final StateRepository stateRepository; private final String id; - private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; private Instant lastCommit = Instant.EPOCH; - private boolean commitsEnabled = true; @Override public void onPartitionsAssigned(Collection partitions) @@ -33,17 +29,11 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe partitions.forEach(tp -> { Integer partition = tp.partition(); + log.info("{} - adding partition: {}", id, partition); StateDocument document = stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - log.info("{} - adding partition: {}, offset={}", id, partition, document.offset); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } recordHandler.addPartition(partition, document.state); }); } @@ -54,21 +44,18 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe partitions.forEach(tp -> { Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - offset); - if (commitsEnabled) - { - Map removed = recordHandler.removePartition(partition); - stateRepository.save(new StateDocument(partition, removed, offset)); - } - else + log.info("{} - removing partition: {}", id, partition); + Map removed = recordHandler.removePartition(partition); + for (String key : removed.keySet()) { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); } + stateRepository.save(new StateDocument(partition, removed)); }); } @@ -76,33 +63,14 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe @Override public void beforeNextPoll() { - if (!commitsEnabled) - { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); - return; - } - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { - log.debug("Storing data and offsets, last commit: {}", lastCommit); + log.debug("Storing data, last commit: {}", lastCommit); recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( new StateDocument( partiton, - adder.getState(), - consumer.position(new TopicPartition(topic, partiton))))); + adder.getState()))); lastCommit = clock.instant(); } } - - @Override - public void enableCommits() - { - commitsEnabled = true; - } - - @Override - public void disableCommits() - { - commitsEnabled = false; - } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 58374f4..0238521 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -42,7 +42,6 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - rebalanceListener.enableCommits(); consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) @@ -75,6 +74,7 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -88,12 +88,12 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); + consumer.commitSync(); shutdown(e); } catch(Exception e) { - log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e); - rebalanceListener.disableCommits(); + log.error("{} - Unexpected error: {}", id, e.toString(), e); shutdown(e); } finally diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java index c59418c..8abec12 100644 --- a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java @@ -6,7 +6,4 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener { default void beforeNextPoll() {} - - default void enableCommits() {} - default void disableCommits() {} } diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index 0540e3f..c85cc38 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -14,7 +14,6 @@ public class StateDocument { @Id public String id; - public long offset = -1l; public Map state; public StateDocument() @@ -29,11 +28,9 @@ public class StateDocument public StateDocument( Integer partition, - Map state, - long offset) + Map state) { this.id = Integer.toString(partition); this.state = state; - this.offset = offset; } } diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index cded0ee..d1d8e50 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -32,7 +32,7 @@ public class ApplicationIT @Test - public void testApplicationStartup() + public void testApplicationStartup() { restTemplate.getForObject( "http://localhost:" + port + "/actuator/health", diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9a6f812..cff44e2 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -60,8 +60,6 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired - StateRepository stateRepository; - @Autowired PollIntervalAwareConsumerRebalanceListener rebalanceListener; @Autowired RecordHandler recordHandler; @@ -227,29 +225,23 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StateDocument document = - stateRepository - .findById(partition.toString()) - .orElse(new StateDocument(partition)); - document.offset = offset; - stateRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = stateRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); } List partitions() -- 2.20.1