From: Kai Moritz Date: Tue, 16 Aug 2022 16:58:10 +0000 (+0200) Subject: Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged X-Git-Tag: sumup-adder---lvm-2-tage~10^2~2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=890fd85c334a078610701bd6e571d133df69473f;hp=-c;p=demos%2Fkafka%2Ftraining Verbesserungen und Fachlogik-Test aus 'sumup-adder' gemerged --- 890fd85c334a078610701bd6e571d133df69473f diff --combined src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 5e1f8fb,f83661e..b58295f --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -1,5 -1,6 +1,5 @@@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@@ -17,23 -18,34 +17,31 @@@ import java.util.concurrent.Executors public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler() + public ApplicationRecordHandler recordHandler(AdderResults adderResults) { - return new ApplicationRecordHandler(); + return new ApplicationRecordHandler(adderResults); + } + + @Bean + public AdderResults adderResults() + { + return new AdderResults(); } @Bean public ApplicationRebalanceListener rebalanceListener( ApplicationRecordHandler recordHandler, + AdderResults adderResults, StateRepository stateRepository, - Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, + adderResults, stateRepository, properties.getClientId(), - properties.getTopic(), Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); + properties.getCommitInterval()); } @Bean @@@ -69,8 -81,8 +77,8 @@@ props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); - props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); diff --combined src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 7256732,cd9da64..32e14e8 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@@ -2,13 -2,13 +2,12 @@@ package de.juplo.kafka import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import java.time.Clock; import java.time.Duration; import java.time.Instant; - import java.util.Collection; - import java.util.Map; + import java.util.*; @RequiredArgsConstructor @@@ -16,12 -16,18 +15,15 @@@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { private final ApplicationRecordHandler recordHandler; + private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; - private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; + private final Set partitions = new HashSet<>(); + private Instant lastCommit = Instant.EPOCH; - private boolean commitsEnabled = true; @Override public void onPartitionsAssigned(Collection partitions) @@@ -29,12 -35,20 +31,14 @@@ partitions.forEach(tp -> { Integer partition = tp.partition(); + log.info("{} - adding partition: {}", id, partition); + this.partitions.add(partition); StateDocument document = stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - log.info("{} - adding partition: {}, offset={}", id, partition, document.offset); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } recordHandler.addPartition(partition, document.state); + adderResults.addPartition(partition, document.results); }); } @@@ -44,18 -58,23 +48,20 @@@ partitions.forEach(tp -> { Integer partition = tp.partition(); + log.info("{} - removing partition: {}", id, partition); - Map removed = recordHandler.removePartition(partition); - for (String key : removed.keySet()) + this.partitions.remove(partition); - Long offset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - offset); - if (commitsEnabled) - { - Map state = recordHandler.removePartition(partition); - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results, offset)); - } - else ++ Map state = recordHandler.removePartition(partition); ++ for (String key : state.keySet()) { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, - removed.get(key), ++ state.get(key), + partition, + key); } - stateRepository.save(new StateDocument(partition, removed)); ++ Map> results = adderResults.removePartition(partition); ++ stateRepository.save(new StateDocument(partition, state, results)); }); } @@@ -63,14 -82,34 +69,15 @@@ @Override public void beforeNextPoll() { - if (!commitsEnabled) - { - log.info("Offset commits are disabled! Last commit: {}", lastCommit); - return; - } - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { - log.debug("Storing data and offsets, last commit: {}", lastCommit); + log.debug("Storing data, last commit: {}", lastCommit); - recordHandler.getState().forEach((partiton, adder) -> stateRepository.save( + partitions.forEach(partition -> stateRepository.save( new StateDocument( - partiton, - adder.getState()))); + partition, + recordHandler.getState(partition).getState(), - adderResults.getState(partition), - consumer.position(new TopicPartition(topic, partition))))); ++ adderResults.getState(partition)))); lastCommit = clock.instant(); } } - - @Override - public void enableCommits() - { - commitsEnabled = true; - } - - @Override - public void disableCommits() - { - commitsEnabled = false; - } } diff --combined src/main/java/de/juplo/kafka/StateDocument.java index c85cc38,c10a50c..ae8eb51 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@@ -5,6 -5,7 +5,7 @@@ import org.springframework.data.annotat import org.springframework.data.mongodb.core.mapping.Document; import java.util.HashMap; + import java.util.List; import java.util.Map; @@@ -14,7 -15,9 +15,8 @@@ public class StateDocumen { @Id public String id; - public Map state; - public long offset = -1l; + public Map state; + public Map> results; public StateDocument() { @@@ -24,13 -27,18 +26,16 @@@ { this.id = Integer.toString(partition); this.state = new HashMap<>(); + this.results = new HashMap<>(); } public StateDocument( Integer partition, - Map state) + Map state, - Map> results, - long offset) ++ Map> results) { this.id = Integer.toString(partition); this.state = state; + this.results = results; - this.offset = offset; } }