From: Kai Moritz Date: Sat, 13 Aug 2022 08:45:03 +0000 (+0200) Subject: Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=cfbe7dcd3318ee846cb5890eea4328e36c5aa364;hp=-c;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged --- cfbe7dcd3318ee846cb5890eea4328e36c5aa364 diff --combined docker-compose.yml index 30ae3b4,7ab77b2..ee78746 --- a/docker-compose.yml +++ b/docker-compose.yml @@@ -40,21 -40,23 +40,23 @@@ services ME_CONFIG_MONGODB_ADMINUSERNAME: juplo ME_CONFIG_MONGODB_ADMINPASSWORD: training ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + depends_on: + - mongo - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/endless-long-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: @@@ -62,17 -64,29 +64,17 @@@ producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 500 + producer.throttle-ms: 10 - peter: + consumer: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - 8081:8080 environment: server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: peter - consumer.topic: test - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo - - beate: - image: juplo/endless-consumer:1.0-SNAPSHOT - ports: - - 8082:8080 - environment: - server.port: 8080 - consumer.bootstrap-server: kafka:9092 - consumer.client-id: beate + consumer.client-id: consumer consumer.topic: test spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo diff --combined src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1ba9d5b,3925fcb..1ea90a2 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -1,6 -1,6 +1,5 @@@ package de.juplo.kafka; - import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@@ -8,10 -8,10 +7,10 @@@ import org.springframework.boot.context import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; + import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - import java.util.function.Consumer; @Configuration @@@ -19,30 -19,44 +18,41 @@@ public class ApplicationConfiguration { @Bean - public Consumer> consumer() - public KeyCountingRecordHandler messageCountingRecordHandler() ++ public KeyCountingRecordHandler keyCountingRecordHandler() { - return (record) -> - { - // Handle record - }; + return new KeyCountingRecordHandler(); + } + + @Bean - public KeyCountingRebalanceListener wordcountRebalanceListener( ++ public KeyCountingRebalanceListener keyCountingRebalanceListener( + KeyCountingRecordHandler keyCountingRecordHandler, + PartitionStatisticsRepository repository, - Consumer consumer, + ApplicationProperties properties) + { + return new KeyCountingRebalanceListener( + keyCountingRecordHandler, + repository, + properties.getClientId(), - properties.getTopic(), + Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); ++ properties.getCommitInterval()); } @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, - PartitionStatisticsRepository repository, + KeyCountingRebalanceListener keyCountingRebalanceListener, + KeyCountingRecordHandler keyCountingRecordHandler, ApplicationProperties properties) { return new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), kafkaConsumer, - handler); + keyCountingRebalanceListener, + keyCountingRecordHandler); } @Bean @@@ -57,9 -71,12 +67,11 @@@ Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); - props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --combined src/main/java/de/juplo/kafka/EndlessConsumer.java index a21dd86,58557f2..047d5cb --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -19,14 -19,14 +19,14 @@@ import java.util.concurrent.locks.Reent @Slf4j @RequiredArgsConstructor - public class EndlessConsumer implements ConsumerRebalanceListener, Runnable + public class EndlessConsumer implements Runnable { private final ExecutorService executor; - private final PartitionStatisticsRepository repository; private final String id; private final String topic; private final Consumer consumer; - private final java.util.function.Consumer> handler; + private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; + private final RecordHandler handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@@ -34,56 -34,6 +34,6 @@@ private Exception exception; private long consumed = 0; - private final Map> seen = new HashMap<>(); - private final Map offsets = new HashMap<>(); - - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - Long oldOffset = offsets.remove(partition); - log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", - id, - partition, - newOffset - oldOffset, - oldOffset, - newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } - repository.save(new StatisticsDocument(partition, removed)); - }); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - offsets.put(partition, offset); - seen.put( - partition, - repository - .findById(Integer.toString(tp.partition())) - .map(document -> document.statistics) - .orElse(new HashMap<>())); - }); - } @Override @@@ -92,7 -42,7 +42,7 @@@ try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); + consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); while (true) { @@@ -116,24 -66,14 +66,15 @@@ handler.accept(record); consumed++; - - Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); } + + pollIntervalAwareRebalanceListener.beforeNextPoll(); } } catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@@ -147,7 -87,6 +88,7 @@@ offset, e.getCause().toString()); + consumer.commitSync(); shutdown(e); } catch(Exception e) @@@ -197,11 -136,6 +138,6 @@@ } } - public Map> getSeen() - { - return seen; - } - public void start() { lock.lock(); @@@ -221,7 -155,7 +157,7 @@@ } } - public synchronized void stop() throws ExecutionException, InterruptedException + public synchronized void stop() throws InterruptedException { lock.lock(); try @@@ -244,22 -178,7 +180,7 @@@ public void destroy() throws ExecutionException, InterruptedException { log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - catch (Exception e) - { - log.error("{} - Unexpected exception while trying to stop the consumer", id, e); - } - finally - { - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } public boolean running() diff --combined src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java index 0000000,4a2c036..636ff86 mode 000000,100644..100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java @@@ -1,0 -1,83 +1,76 @@@ + package de.juplo.kafka; + + import lombok.RequiredArgsConstructor; + import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; + import org.apache.kafka.common.TopicPartition; + + import java.time.Clock; + import java.time.Duration; + import java.time.Instant; + import java.util.Collection; + import java.util.Map; + + + @RequiredArgsConstructor + @Slf4j + public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener + { + private final KeyCountingRecordHandler handler; + private final PartitionStatisticsRepository repository; + private final String id; - private final String topic; + private final Clock clock; + private final Duration commitInterval; - private final Consumer consumer; + + private Instant lastCommit = Instant.EPOCH; + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); ++ log.info("{} - adding partition: {}", id, partition); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } + handler.addPartition(partition, document.statistics); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); ++ log.info("{} - removing partition: {}", id, partition); + Map removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); ++ for (String key : removed.keySet()) ++ { ++ log.info( ++ "{} - Seen {} messages for partition={}|key={}", ++ id, ++ removed.get(key), ++ partition, ++ key); ++ } ++ repository.save(new StatisticsDocument(partition, removed)); + }); + } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { - log.debug("Storing data and offsets, last commit: {}", lastCommit); ++ log.debug("Storing data, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); ++ statistics))); + lastCommit = clock.instant(); + } + } + } diff --combined src/main/java/de/juplo/kafka/StatisticsDocument.java index 2416253,1244f45..415ef5c --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@@ -14,15 -14,23 +14,21 @@@ public class StatisticsDocumen { @Id public String id; - public long offset = -1l; public Map statistics; public StatisticsDocument() { } + public StatisticsDocument(Integer partition) + { + this.id = Integer.toString(partition); + this.statistics = new HashMap<>(); + } + - public StatisticsDocument(Integer partition, Map statistics, long offset) + public StatisticsDocument(Integer partition, Map statistics) { this.id = Integer.toString(partition); this.statistics = statistics; - this.offset = offset; } } diff --combined src/test/java/de/juplo/kafka/ApplicationIT.java index 0000000,cded0ee..d1d8e50 mode 000000,100644..100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@@ -1,0 -1,43 +1,43 @@@ + package de.juplo.kafka; + + import org.junit.jupiter.api.Test; + import org.springframework.beans.factory.annotation.Autowired; + import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; + import org.springframework.boot.test.context.SpringBootTest; + import org.springframework.boot.test.web.client.TestRestTemplate; + import org.springframework.boot.test.web.server.LocalServerPort; + import org.springframework.kafka.test.context.EmbeddedKafka; + + import static de.juplo.kafka.ApplicationTests.TOPIC; + + + @SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC, + "spring.mongodb.embedded.version=4.4.13" }) + @EmbeddedKafka(topics = TOPIC) + @AutoConfigureDataMongo + public class ApplicationIT + { + public static final String TOPIC = "FOO"; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + + + @Test - public void testApplicationStartup() ++ public void testApplicationStartup() + { + restTemplate.getForObject( + "http://localhost:" + port + "/actuator/health", + String.class + ) + .contains("UP"); + } + } diff --combined src/test/java/de/juplo/kafka/ApplicationTests.java index caa25c5,fc5d4c9..7f666f6 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@@ -26,8 -26,6 +26,6 @@@ import java.util.* import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; - import java.util.function.Consumer; - import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@@ -43,6 -41,7 +41,7 @@@ import static org.awaitility.Awaitility properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@@ -65,13 -64,18 +64,14 @@@ class ApplicationTest @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired -- PartitionStatisticsRepository repository; - @Autowired + KeyCountingRebalanceListener keyCountingRebalanceListener; + @Autowired + KeyCountingRecordHandler keyCountingRecordHandler; - Consumer> testHandler; EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; @@@ -81,17 -85,22 +81,22 @@@ /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { - send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); + send100Messages((partition, key, counter) -> + { + Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@@ -104,16 -113,19 +109,19 @@@ } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - send100Messages(counter -> - counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); + send100Messages((partition, key, counter) -> + { + Bytes value = counter == 77 + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@@ -122,6 -134,7 +130,7 @@@ endlessConsumer.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@@ -159,8 -172,8 +168,8 @@@ Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@@ -175,12 -188,33 +184,27 @@@ /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); ++ offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { ++ // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository - .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); - document.offset = offset; - partitionStatisticsRepository.save(document); + }); ++ // The new positions must be commited! ++ offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); - } + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } List partitions() { @@@ -192,7 -226,12 +216,12 @@@ } - void send100Messages(Function messageGenerator) + public interface RecordGenerator + { + public ProducerRecord generate(int partition, String key, long counter); + } + + void send100Messages(RecordGenerator recordGenerator) { long i = 0; @@@ -200,14 -239,8 +229,8 @@@ { for (int key = 0; key < 10; key++) { - Bytes value = messageGenerator.apply(++i); - ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - Integer.toString(key%2), - value); + recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); kafkaProducer.send(record, (metadata, e) -> { @@@ -237,7 -270,7 +260,7 @@@ @BeforeEach public void init() { - testHandler = record -> {} ; + seekToEnd(); oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); @@@ -249,23 -282,25 +272,25 @@@ newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(keyCountingRecordHandler) { + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), kafkaConsumer, + keyCountingRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); @@@ -313,7 -348,8 +338,7 @@@ Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("enable.auto.commit", false); - props.put("auto.offset.reset", "latest"); + props.put("group.id", properties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName());