From: Kai Moritz Date: Sat, 13 Aug 2022 08:45:03 +0000 (+0200) Subject: Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=cfbe7dcd3318ee846cb5890eea4328e36c5aa364;hp=2768e0f97c441ade5ce8ff371aa590fdc3cfd6c6;p=demos%2Fkafka%2Ftraining Verbesserungen aus 'stored-offsets' nach 'stored-state' gemerged --- diff --git a/README.sh b/README.sh index b7bce99..39e9300 100755 --- a/README.sh +++ b/README.sh @@ -24,17 +24,65 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 -docker-compose up setup -docker-compose up -d producer peter beate +docker-compose up -d kafka-ui -sleep 15 +docker-compose exec -T cli bash << 'EOF' +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF -http -v post :8082/stop +docker-compose up -d consumer + +docker-compose up -d producer sleep 10 -docker-compose kill -s 9 peter -http -v post :8082/start -sleep 60 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen + +docker-compose stop producer +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 7..." +# tag::altertopic[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::altertopic[] +EOF -docker-compose stop producer peter beate -docker-compose logs beate -docker-compose logs --tail=10 peter +docker-compose start producer +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +sleep 1 +http -v :8081/seen +docker-compose stop producer consumer diff --git a/docker-compose.yml b/docker-compose.yml index 7ab77b2..ee78746 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,13 +43,13 @@ services: depends_on: - mongo - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox @@ -64,29 +64,17 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 500 + producer.throttle-ms: 10 - peter: + consumer: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - 8081:8080 environment: server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: peter - consumer.topic: test - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo - - beate: - image: juplo/endless-consumer:1.0-SNAPSHOT - ports: - - 8082:8080 - environment: - server.port: 8080 - consumer.bootstrap-server: kafka:9092 - consumer.client-id: beate + consumer.client-id: consumer consumer.topic: test spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3925fcb..1ea90a2 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -19,26 +18,23 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public KeyCountingRecordHandler messageCountingRecordHandler() + public KeyCountingRecordHandler keyCountingRecordHandler() { return new KeyCountingRecordHandler(); } @Bean - public KeyCountingRebalanceListener wordcountRebalanceListener( + public KeyCountingRebalanceListener keyCountingRebalanceListener( KeyCountingRecordHandler keyCountingRecordHandler, PartitionStatisticsRepository repository, - Consumer consumer, ApplicationProperties properties) { return new KeyCountingRebalanceListener( keyCountingRecordHandler, repository, properties.getClientId(), - properties.getTopic(), Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); + properties.getCommitInterval()); } @Bean @@ -74,7 +70,6 @@ public class ApplicationConfiguration props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); - props.put("enable.auto.commit", false); props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 58557f2..047d5cb 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -74,6 +74,7 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -87,6 +88,7 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); + consumer.commitSync(); shutdown(e); } catch(Exception e) diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java index 4a2c036..636ff86 100644 --- a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; import java.time.Clock; @@ -19,10 +18,8 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe private final KeyCountingRecordHandler handler; private final PartitionStatisticsRepository repository; private final String id; - private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; private Instant lastCommit = Instant.EPOCH; @@ -32,18 +29,11 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe partitions.forEach(tp -> { Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); + log.info("{} - adding partition: {}", id, partition); StatisticsDocument document = repository .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } handler.addPartition(partition, document.statistics); }); } @@ -54,14 +44,18 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe partitions.forEach(tp -> { Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); + log.info("{} - removing partition: {}", id, partition); Map removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + repository.save(new StatisticsDocument(partition, removed)); }); } @@ -71,12 +65,11 @@ public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRe { if (lastCommit.plus(commitInterval).isBefore(clock.instant())) { - log.debug("Storing data and offsets, last commit: {}", lastCommit); + log.debug("Storing data, last commit: {}", lastCommit); handler.getSeen().forEach((partiton, statistics) -> repository.save( new StatisticsDocument( partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); + statistics))); lastCommit = clock.instant(); } } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 1244f45..415ef5c 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -14,7 +14,6 @@ public class StatisticsDocument { @Id public String id; - public long offset = -1l; public Map statistics; public StatisticsDocument() @@ -27,10 +26,9 @@ public class StatisticsDocument this.statistics = new HashMap<>(); } - public StatisticsDocument(Integer partition, Map statistics, long offset) + public StatisticsDocument(Integer partition, Map statistics) { this.id = Integer.toString(partition); this.statistics = statistics; - this.offset = offset; } } diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index cded0ee..d1d8e50 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -32,7 +32,7 @@ public class ApplicationIT @Test - public void testApplicationStartup() + public void testApplicationStartup() { restTemplate.getForObject( "http://localhost:" + port + "/actuator/health", diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index fc5d4c9..7f666f6 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -64,14 +64,10 @@ class ApplicationTests @Autowired KafkaConsumer offsetConsumer; @Autowired - PartitionStatisticsRepository partitionStatisticsRepository; - @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; @Autowired - PartitionStatisticsRepository repository; - @Autowired KeyCountingRebalanceListener keyCountingRebalanceListener; @Autowired KeyCountingRecordHandler keyCountingRecordHandler; @@ -191,30 +187,24 @@ class ApplicationTests void seekToEnd() { offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); partitions().forEach(tp -> { + // seekToEnd() works lazily: it only takes effect on poll()/position() Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); - Integer partition = tp.partition(); - StatisticsDocument document = - partitionStatisticsRepository - .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); - document.offset = offset; - partitionStatisticsRepository.save(document); }); + // The new positions must be commited! + offsetConsumer.commitSync(); offsetConsumer.unsubscribe(); } void doForCurrentOffsets(BiConsumer consumer) { - partitions().forEach(tp -> - { - String partition = Integer.toString(tp.partition()); - Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); - consumer.accept(tp, offset.orElse(0l)); - }); - } + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); + offsetConsumer.unsubscribe(); + } List partitions() { @@ -348,8 +338,7 @@ class ApplicationTests Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("enable.auto.commit", false); - props.put("auto.offset.reset", "latest"); + props.put("group.id", properties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName());