From: Kai Moritz Date: Sat, 13 Aug 2022 08:45:03 +0000 (+0200) Subject: Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged X-Git-Tag: endless-stream-consumer-DEPRECATED^2^2^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=cfbe7dcd3318ee846cb5890eea4328e36c5aa364;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged --- cfbe7dcd3318ee846cb5890eea4328e36c5aa364 diff --cc docker-compose.yml index 30ae3b4,7ab77b2..ee78746 --- a/docker-compose.yml +++ b/docker-compose.yml @@@ -40,14 -40,16 +40,16 @@@ services ME_CONFIG_MONGODB_ADMINUSERNAME: juplo ME_CONFIG_MONGODB_ADMINPASSWORD: training ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + depends_on: + - mongo - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1ba9d5b,3925fcb..1ea90a2 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -1,6 -1,6 +1,5 @@@ package de.juplo.kafka; - import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@@ -19,12 -19,26 +18,23 @@@ import java.util.concurrent.Executors public class ApplicationConfiguration { @Bean - public Consumer> consumer() - public KeyCountingRecordHandler messageCountingRecordHandler() ++ public KeyCountingRecordHandler keyCountingRecordHandler() { - return (record) -> - { - // Handle record - }; + return new KeyCountingRecordHandler(); + } + + @Bean - public KeyCountingRebalanceListener wordcountRebalanceListener( ++ public KeyCountingRebalanceListener keyCountingRebalanceListener( + KeyCountingRecordHandler keyCountingRecordHandler, + PartitionStatisticsRepository repository, - Consumer consumer, + ApplicationProperties properties) + { + return new KeyCountingRebalanceListener( + keyCountingRecordHandler, + repository, + properties.getClientId(), - properties.getTopic(), + Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); ++ properties.getCommitInterval()); } @Bean @@@ -57,9 -71,12 +67,11 @@@ Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); - props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --cc src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java index 0000000,4a2c036..636ff86 mode 000000,100644..100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java @@@ -1,0 -1,83 +1,76 @@@ + package de.juplo.kafka; + + import lombok.RequiredArgsConstructor; + import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; + import org.apache.kafka.common.TopicPartition; + + import java.time.Clock; + import java.time.Duration; + import java.time.Instant; + import java.util.Collection; + import java.util.Map; + + + @RequiredArgsConstructor + @Slf4j + public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener + { + private final KeyCountingRecordHandler handler; + private final PartitionStatisticsRepository repository; + private final String id; - private final String topic; + private final Clock clock; + private final Duration commitInterval; - private final Consumer consumer; + + private Instant lastCommit = Instant.EPOCH; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); ++ log.info("{} - adding partition: {}", id, partition); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } + handler.addPartition(partition, document.statistics); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); ++ log.info("{} - removing partition: {}", id, partition); + Map removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); ++ for (String key : removed.keySet()) ++ { ++ log.info( ++ "{} - Seen {} messages for partition={}|key={}", ++ id, ++ removed.get(key), ++ partition, ++ key); ++ } ++ repository.save(new StatisticsDocument(partition, removed)); + }); + } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { - log.debug("Storing data and offsets, last commit: {}", lastCommit); ++ log.debug("Storing data, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); ++ statistics))); + lastCommit = clock.instant(); + } + } + } diff --cc src/main/java/de/juplo/kafka/StatisticsDocument.java index 2416253,1244f45..415ef5c --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@@ -20,7 -21,13 +20,13 @@@ public class StatisticsDocumen { } + public StatisticsDocument(Integer partition) + { + this.id = Integer.toString(partition); + this.statistics = new HashMap<>(); + } + - public StatisticsDocument(Integer partition, Map statistics, long offset) + public StatisticsDocument(Integer partition, Map statistics) { this.id = Integer.toString(partition); this.statistics = statistics; diff --cc src/test/java/de/juplo/kafka/ApplicationIT.java index 0000000,cded0ee..d1d8e50 mode 000000,100644..100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@@ -1,0 -1,43 +1,43 @@@ + package de.juplo.kafka; + + import org.junit.jupiter.api.Test; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; + import org.springframework.boot.test.context.SpringBootTest; + import org.springframework.boot.test.web.client.TestRestTemplate; + import org.springframework.boot.test.web.server.LocalServerPort; + import org.springframework.kafka.test.context.EmbeddedKafka; + + import static de.juplo.kafka.ApplicationTests.TOPIC; + + + @SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC, + "spring.mongodb.embedded.version=4.4.13" }) + @EmbeddedKafka(topics = TOPIC) + @AutoConfigureDataMongo + public class ApplicationIT + { + public static final String TOPIC = "FOO"; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + + + @Test - public void testApplicationStartup() ++ public void testApplicationStartup() + { + restTemplate.getForObject( + "http://localhost:" + port + "/actuator/health", + String.class + ) + .contains("UP"); + } + } diff --cc src/test/java/de/juplo/kafka/ApplicationTests.java index caa25c5,fc5d4c9..7f666f6 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -69,9 -70,12 +68,10 @@@ class ApplicationTest @Autowired ExecutorService executor; @Autowired -- PartitionStatisticsRepository repository; - @Autowired + KeyCountingRebalanceListener keyCountingRebalanceListener; + @Autowired + KeyCountingRecordHandler keyCountingRecordHandler; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@@ -175,12 -188,33 +184,27 @@@ /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); ++ offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { ++ // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository - .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); - document.offset = offset; - partitionStatisticsRepository.save(document); + }); ++ // The new positions must be commited! ++ offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); - } + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } List partitions() {