From: Kai Moritz Date: Mon, 28 Oct 2024 08:39:10 +0000 (+0100) Subject: Der Zustand des Zählers wird in einem compacted Topic abgelegt X-Git-Tag: consumer/spring-consumer--log-compaction--2024-11-13--si~23 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=0a531eceedd7f6cb2eb97ea7cccfa96538aeff8f;p=demos%2Fkafka%2Ftraining Der Zustand des Zählers wird in einem compacted Topic abgelegt * Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt wurden. * Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf erfolgt. --- diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 8a9173a..34c2c5b 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -137,11 +137,17 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic state else kafka-topics --bootstrap-server kafka:9092 \ --delete \ --if-exists \ --topic test + kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic state \ + kafka-topics --bootstrap-server kafka:9092 \ --create \ --topic test \ @@ -150,7 +156,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ - && date > INITIALIZED + && \ + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic state \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + --config cleanup.policy=compact \ + && echo Das Topic \'state\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \ + && \ + date > INITIALIZED fi stop_grace_period: 0s depends_on: diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index a4856a6..c1fe03a 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,7 +3,11 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; @@ -19,6 +23,7 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, + Producer kafkaProducer, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -27,6 +32,8 @@ public class ApplicationConfiguration properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, + properties.getProducerProperties().getTopic(), + kafkaProducer, () -> applicationContext.close()); } @@ -52,4 +59,23 @@ public class ApplicationConfiguration return new KafkaConsumer<>(props); } + + @Bean + public KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getProducerProperties().getAcks()); + props.put("batch.size", properties.getProducerProperties().getBatchSize()); + props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("delivery.timeout.ms", 20000); // 20 Sekunden + props.put("request.timeout.ms", 10000); // 10 Sekunden + props.put("linger.ms", properties.getProducerProperties().getLingerMs()); + props.put("compression.type", properties.getProducerProperties().getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", LongSerializer.class.getName()); + + return new KafkaProducer<>(props); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9..0b43159 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -25,6 +25,8 @@ public class ApplicationProperties @NotNull private ConsumerProperties consumer; + @NotNull + private ProducerProperties producer; public ConsumerProperties getConsumerProperties() @@ -32,6 +34,11 @@ public class ApplicationProperties return consumer; } + public ProducerProperties getProducerProperties() + { + return producer; + } + @Validated @Getter @@ -49,4 +56,24 @@ public class ApplicationProperties enum OffsetReset { latest, earliest, none } } + + @Validated + @Getter + @Setter + static class ProducerProperties + { + @NotNull + @NotEmpty + private String topic; + @NotNull + @NotEmpty + private String acks; + @NotNull + private Integer batchSize; + @NotNull + private Integer lingerMs; + @NotNull + @NotEmpty + private String compressionType; + } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6767c6b..da845bd 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,18 +2,24 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Phaser; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { private final String id; private final String topic; @@ -22,8 +28,14 @@ public class ExampleConsumer implements Runnable private final Runnable closeCallback; private final Map counterState = new HashMap<>(); + private final String stateTopic; + private final Producer producer; private volatile boolean running = false; + private final Phaser phaser = new Phaser(1); + private volatile int[] seen; + private volatile int[] acked; + private volatile boolean[] done; private long consumed = 0; @@ -31,11 +43,15 @@ public class ExampleConsumer implements Runnable String clientId, String topic, Consumer consumer, + String stateTopic, + Producer producer, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; + this.stateTopic = stateTopic; + this.producer = producer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -49,6 +65,13 @@ public class ExampleConsumer implements Runnable { try { + log.info("{} - Fetching PartitionInfo for topic {}", id, topic); + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + seen = new int[numPartitions]; + acked = new int[numPartitions]; + done = new boolean[numPartitions]; + log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); running = true; @@ -59,15 +82,29 @@ public class ExampleConsumer implements Runnable consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } + records + .partitions() + .forEach(partition -> + { + seen[partition.partition()] = 0; + acked[partition.partition()] = 0; + done[partition.partition()] = false; + + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + + done[partition.partition()] = true; + }); + + int phase = phaser.arriveAndAwaitAdvance(); + log.info("{} - Phase {} is done!", id, phase); } } catch(WakeupException e) @@ -100,6 +137,7 @@ public class ExampleConsumer implements Runnable log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); Long counter = computeCount(key); log.info("{} - current value for counter {}: {}", id, key, counter); + sendCounterState(partition, key, counter); } private synchronized Long computeCount(String key) @@ -112,6 +150,41 @@ public class ExampleConsumer implements Runnable return counterState; } + void sendCounterState(int partition, String key, Long counter) + { + seen[partition]++; + ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter); + producer.send(record, ((metadata, exception) -> + { + if (exception == null) + { + acked[partition]++; + if (done[partition] && !(acked[partition] < seen[partition])) + { + phaser.arrive(); + } + } + else + { + // Errors are ignored (for now): + // The next occurrence of the key will issue a new update of the counter state + log.error("{} - {}", id, exception.toString()); + } + })); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + phaser.bulkRegister(partitions.size()); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(partition -> phaser.arriveAndDeregister()); + } + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731..d9e7066 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,12 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + producer: + topic: state + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: @@ -28,6 +34,12 @@ info: topic: ${juplo.consumer.topic} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} + producer: + topic: ${juplo.producer.topic} + acks: ${juplo.producer.acks} + batch-size: ${juplo.producer.batch-size} + linger-ms: ${juplo.producer.linger-ms} + compression-type: ${juplo.producer.compression-type} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index e4b97a4..22bb613 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -9,8 +9,7 @@ import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationTests.*; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; @@ -21,12 +20,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.consumer.topic=" + TOPIC }) + "juplo.consumer.topic=" + TOPIC_IN}) @AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) public class ApplicationTests { - static final String TOPIC = "FOO"; + static final String TOPIC_IN = "FOO"; + static final String TOPIC_OUT = "BAR"; static final int PARTITIONS = 10; @Autowired