From 60a1f94506106759fa53e9957f85c2620e0a024b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 09:39:10 +0100 Subject: [PATCH] =?utf8?q?Der=20Zustand=20des=20Z=C3=A4hlers=20wird=20in?= =?utf8?q?=20einem=20compacted=20Topic=20abgelegt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt wurden. * Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf erfolgt. --- docker/docker-compose.yml | 19 +++- .../juplo/kafka/ApplicationConfiguration.java | 28 +++++- .../de/juplo/kafka/ApplicationProperties.java | 27 ++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 95 ++++++++++++++++--- src/main/resources/application.yml | 12 +++ .../java/de/juplo/kafka/ApplicationTests.java | 10 +- 6 files changed, 173 insertions(+), 18 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 82c264d..45fd38f 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -137,11 +137,17 @@ services: echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test + kafka-topics --bootstrap-server kafka:9092 --describe --topic state else kafka-topics --bootstrap-server kafka:9092 \ --delete \ --if-exists \ --topic test + kafka-topics --bootstrap-server kafka:9092 \ + --delete \ + --if-exists \ + --topic state \ + kafka-topics --bootstrap-server kafka:9092 \ --create \ --topic test \ @@ -150,7 +156,18 @@ services: --config min.insync.replicas=2 \ && echo Das Topic \'test\' wurde erfolgreich angelegt: \ && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \ - && date > INITIALIZED + && \ + kafka-topics --bootstrap-server kafka:9092 \ + --create \ + --topic state \ + --partitions 2 \ + --replication-factor 3 \ + --config min.insync.replicas=2 \ + --config cleanup.policy=compact \ + && echo Das Topic \'state\' wurde erfolgreich angelegt: \ + && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \ + && \ + date > INITIALIZED fi stop_grace_period: 0s depends_on: diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 116d63d..5e4302c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,7 +3,11 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -18,13 +22,16 @@ public class ApplicationConfiguration @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, + Producer kafkaProducer, ApplicationProperties properties) { return new ExampleConsumer( properties.getClientId(), properties.getConsumerProperties().getTopic(), - kafkaConsumer); + kafkaConsumer, + properties.getProducerProperties().getTopic(), + kafkaProducer); } @Bean @@ -49,4 +56,23 @@ public class ApplicationConfiguration return new KafkaConsumer<>(props); } + + @Bean + public KafkaProducer kafkaProducer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getProducerProperties().getAcks()); + props.put("batch.size", properties.getProducerProperties().getBatchSize()); + props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("delivery.timeout.ms", 20000); // 20 Sekunden + props.put("request.timeout.ms", 10000); // 10 Sekunden + props.put("linger.ms", properties.getProducerProperties().getLingerMs()); + props.put("compression.type", properties.getProducerProperties().getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", LongSerializer.class.getName()); + + return new KafkaProducer<>(props); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index c8193c9..0b43159 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -25,6 +25,8 @@ public class ApplicationProperties @NotNull private ConsumerProperties consumer; + @NotNull + private ProducerProperties producer; public ConsumerProperties getConsumerProperties() @@ -32,6 +34,11 @@ public class ApplicationProperties return consumer; } + public ProducerProperties getProducerProperties() + { + return producer; + } + @Validated @Getter @@ -49,4 +56,24 @@ public class ApplicationProperties enum OffsetReset { latest, earliest, none } } + + @Validated + @Getter + @Setter + static class ProducerProperties + { + @NotNull + @NotEmpty + private String topic; + @NotNull + @NotEmpty + private String acks; + @NotNull + private Integer batchSize; + @NotNull + private Integer lingerMs; + @NotNull + @NotEmpty + private String compressionType; + } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 4cb6b21..77cef8d 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,18 +2,24 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Phaser; @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer implements Runnable, ConsumerRebalanceListener { private final String id; private final String topic; @@ -21,19 +27,29 @@ public class ExampleConsumer implements Runnable private final Thread workerThread; private final Map counterState = new HashMap<>(); + private final String stateTopic; + Producer producer; private volatile boolean running = false; + private final Phaser phaser = new Phaser(1); + private volatile int[] seen; + private volatile int[] acked; + private volatile boolean[] done; private long consumed = 0; public ExampleConsumer( String clientId, String topic, - Consumer consumer) + Consumer consumer, + String stateTopic, + Producer producer) { this.id = clientId; this.topic = topic; this.consumer = consumer; + this.stateTopic = stateTopic; + this.producer = producer; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -45,6 +61,13 @@ public class ExampleConsumer implements Runnable { try { + log.info("{} - Fetching PartitionInfo for topic {}", id, topic); + int numPartitions = consumer.partitionsFor(topic).size(); + log.info("{} - Topic {} has {} partitions", id, topic, numPartitions); + seen = new int[numPartitions]; + acked = new int[numPartitions]; + done = new boolean[numPartitions]; + log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); running = true; @@ -55,15 +78,29 @@ public class ExampleConsumer implements Runnable consumer.poll(Duration.ofSeconds(1)); log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } + records + .partitions() + .forEach(partition -> + { + seen[partition.partition()] = 0; + acked[partition.partition()] = 0; + done[partition.partition()] = false; + + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + + done[partition.partition()] = true; + }); + + int phase = phaser.arriveAndAwaitAdvance(); + log.info("{} - Phase {} is done!", id, phase); } } catch(WakeupException e) @@ -92,6 +129,7 @@ public class ExampleConsumer implements Runnable log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); Long counter = computeCount(key); log.info("{} - current value for counter {}: {}", id, key, counter); + sendCounterState(partition, key, counter); } private synchronized Long computeCount(String key) @@ -104,6 +142,41 @@ public class ExampleConsumer implements Runnable return counterState; } + void sendCounterState(int partition, String key, Long counter) + { + seen[partition]++; + ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter); + producer.send(record, ((metadata, exception) -> + { + if (exception == null) + { + acked[partition]++; + if (done[partition] && !(acked[partition] < seen[partition])) + { + phaser.arrive(); + } + } + else + { + // Errors are ignored (for now): + // The next occurrence of the key will issue a new update of the counter state + log.error("{} - {}", id, exception.toString()); + } + })); + } + + @Override + public void onPartitionsAssigned(Collection partitions) + { + phaser.bulkRegister(partitions.size()); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(partition -> phaser.arriveAndDeregister()); + } + public void shutdown() throws InterruptedException { log.info("{} joining the worker-thread...", id); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731..d9e7066 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,6 +6,12 @@ juplo: topic: test auto-offset-reset: earliest auto-commit-interval: 5s + producer: + topic: state + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: @@ -28,6 +34,12 @@ info: topic: ${juplo.consumer.topic} auto-offset-reset: ${juplo.consumer.auto-offset-reset} auto-commit-interval: ${juplo.consumer.auto-commit-interval} + producer: + topic: ${juplo.producer.topic} + acks: ${juplo.producer.acks} + batch-size: ${juplo.producer.batch-size} + linger-ms: ${juplo.producer.linger-ms} + compression-type: ${juplo.producer.compression-type} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index e4b97a4..22bb613 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -9,8 +9,7 @@ import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationTests.*; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; @@ -21,12 +20,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "spring.kafka.consumer.auto-offset-reset=earliest", - "juplo.consumer.topic=" + TOPIC }) + "juplo.consumer.topic=" + TOPIC_IN}) @AutoConfigureMockMvc -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) public class ApplicationTests { - static final String TOPIC = "FOO"; + static final String TOPIC_IN = "FOO"; + static final String TOPIC_OUT = "BAR"; static final int PARTITIONS = 10; @Autowired -- 2.20.1