From: Kai Moritz Date: Wed, 17 Aug 2022 20:51:10 +0000 (+0200) Subject: ROT: Korrigierten/Verbesserten Test und Überarbeitetes Setup gemerged X-Git-Tag: sumup-adder---lvm-2-tage~10 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=d2eb370acf1a2195c36421ffc471f67cb4a8e86e;hp=600b0b10f8e98bebef80f75e391a78c459ffb45c;p=demos%2Fkafka%2Ftraining ROT: Korrigierten/Verbesserten Test und Überarbeitetes Setup gemerged * Merge branch 'sumup-adder--ohne--stored-offsets' into sumup-adder. * In dem gemergten Branch ist es nicht wichtig, wann genau die Mongo-DB zwischen den Tests zurückgesetzt wird, da sie nur den Zustand des Consumers enthält. * Wenn die Offsets mit in der Mongo-DB gespeichert werden, ist es wesentlich, an zu welchem Zeitpunkt während der Test-Vorbereitung diese zurückgesetzt wird! * ROT: Der verbesserte/verschärfte Test deckt Fehler in der Test-Logik auf. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b58295f..f83661e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -33,6 +34,7 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, + Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( @@ -40,8 +42,10 @@ public class ApplicationConfiguration adderResults, stateRepository, properties.getClientId(), + properties.getTopic(), Clock.systemDefaultZone(), - properties.getCommitInterval()); + properties.getCommitInterval(), + consumer); } @Bean @@ -77,8 +81,8 @@ public class ApplicationConfiguration props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); + props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 32e14e8..cd9da64 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import java.time.Clock; @@ -18,12 +19,15 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; + private final String topic; private final Clock clock; private final Duration commitInterval; + private final Consumer consumer; private final Set partitions = new HashSet<>(); private Instant lastCommit = Instant.EPOCH; + private boolean commitsEnabled = true; @Override public void onPartitionsAssigned(Collection partitions) @@ -31,12 +35,18 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe partitions.forEach(tp -> { Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); this.partitions.add(partition); StateDocument document = stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); + log.info("{} - adding partition: {}, offset={}", id, partition, document.offset); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } recordHandler.addPartition(partition, document.state); adderResults.addPartition(partition, document.results); }); @@ -48,20 +58,23 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe partitions.forEach(tp -> { Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String key : state.keySet()) + Long offset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + offset); + if (commitsEnabled) + { + Map state = recordHandler.removePartition(partition); + Map> results = adderResults.removePartition(partition); + stateRepository.save(new StateDocument(partition, state, results, offset)); + } + else { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - state.get(key), - partition, - key); + log.info("Offset commits are disabled! Last commit: {}", lastCommit); } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); }); } @@ -69,15 +82,34 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe @Override public void beforeNextPoll() { + if (!commitsEnabled) + { + log.info("Offset commits are disabled! Last commit: {}", lastCommit); + return; + } + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { - log.debug("Storing data, last commit: {}", lastCommit); + log.debug("Storing data and offsets, last commit: {}", lastCommit); partitions.forEach(partition -> stateRepository.save( new StateDocument( partition, recordHandler.getState(partition).getState(), - adderResults.getState(partition)))); + adderResults.getState(partition), + consumer.position(new TopicPartition(topic, partition))))); lastCommit = clock.instant(); } } + + @Override + public void enableCommits() + { + commitsEnabled = true; + } + + @Override + public void disableCommits() + { + commitsEnabled = false; + } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0238521..58374f4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -42,6 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); + rebalanceListener.enableCommits(); consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) @@ -74,7 +75,6 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -88,12 +88,12 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); - consumer.commitSync(); shutdown(e); } catch(Exception e) { - log.error("{} - Unexpected error: {}", id, e.toString(), e); + log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e); + rebalanceListener.disableCommits(); shutdown(e); } finally diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java index 8abec12..c59418c 100644 --- a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java @@ -6,4 +6,7 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener { default void beforeNextPoll() {} + + default void enableCommits() {} + default void disableCommits() {} } diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index ae8eb51..c10a50c 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -15,6 +15,7 @@ public class StateDocument { @Id public String id; + public long offset = -1l; public Map state; public Map> results; @@ -32,10 +33,12 @@ public class StateDocument public StateDocument( Integer partition, Map state, - Map> results) + Map> results, + long offset) { this.id = Integer.toString(partition); this.state = state; this.results = results; + this.offset = offset; } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 93daf6b..b019373 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -62,6 +62,8 @@ abstract class GenericApplicationTests @Autowired ExecutorService executor; @Autowired + StateRepository stateRepository; + @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @@ -228,23 +230,29 @@ abstract class GenericApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { - // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StateDocument document = + stateRepository + .findById(partition.toString()) + .orElse(new StateDocument(partition)); + document.offset = offset; + stateRepository.save(document); }); - // The new positions must be commited! - offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); + partitions().forEach(tp -> + { + String partition = Integer.toString(tp.partition()); + Optional offset = stateRepository.findById(partition).map(document -> document.offset); + consumer.accept(tp, offset.orElse(0l)); + }); } List partitions() @@ -324,6 +332,7 @@ abstract class GenericApplicationTests props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); + mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); seekToEnd(); oldOffsets = new HashMap<>(); @@ -349,8 +358,6 @@ abstract class GenericApplicationTests } }; - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - endlessConsumer = new EndlessConsumer<>( executor,