Der Zustand des Zählers wird in einem compacted Topic abgelegt
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 08:39:10 +0000 (09:39 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 13:26:07 +0000 (14:26 +0100)
* Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt
  wurden.
* Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den
  zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf
  erfolgt.

docker/docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 8a9173a..34c2c5b 100644 (file)
@@ -137,11 +137,17 @@ services:
           echo -n Bereits konfiguriert: 
           cat INITIALIZED
           kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+          kafka-topics --bootstrap-server kafka:9092 --describe --topic state
         else
           kafka-topics --bootstrap-server kafka:9092 \
                        --delete \
                        --if-exists \
                        --topic test
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --delete \
+                       --if-exists \
+                       --topic state \
+
           kafka-topics --bootstrap-server kafka:9092 \
                        --create \
                        --topic test \
@@ -150,7 +156,18 @@ services:
                        --config min.insync.replicas=2 \
           && echo Das Topic \'test\' wurde erfolgreich angelegt: \
           && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
-          && date > INITIALIZED
+          && \
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --create \
+                       --topic state \
+                       --partitions 2 \
+                       --replication-factor 3 \
+                       --config min.insync.replicas=2 \
+                       --config cleanup.policy=compact \
+          && echo Das Topic \'state\' wurde erfolgreich angelegt: \
+          && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \
+          && \
+          date > INITIALIZED
         fi
     stop_grace_period: 0s
     depends_on:
index a4856a6..c1fe03a 100644 (file)
@@ -3,7 +3,11 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
@@ -19,6 +23,7 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer exampleConsumer(
       Consumer<String, String> kafkaConsumer,
+      Producer<String, Long> kafkaProducer,
       ApplicationProperties properties,
       ConfigurableApplicationContext applicationContext)
   {
@@ -27,6 +32,8 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getConsumerProperties().getTopic(),
             kafkaConsumer,
+            properties.getProducerProperties().getTopic(),
+            kafkaProducer,
             () -> applicationContext.close());
   }
 
@@ -52,4 +59,23 @@ public class ApplicationConfiguration
 
     return new KafkaConsumer<>(props);
   }
+
+  @Bean
+  public KafkaProducer<String, Long> kafkaProducer(ApplicationProperties properties)
+  {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("client.id", properties.getClientId());
+    props.put("acks", properties.getProducerProperties().getAcks());
+    props.put("batch.size", properties.getProducerProperties().getBatchSize());
+    props.put("metadata.maxage.ms",   5000); //  5 Sekunden
+    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
+    props.put("request.timeout.ms",  10000); // 10 Sekunden
+    props.put("linger.ms", properties.getProducerProperties().getLingerMs());
+    props.put("compression.type", properties.getProducerProperties().getCompressionType());
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", LongSerializer.class.getName());
+
+    return new KafkaProducer<>(props);
+  }
 }
index c8193c9..0b43159 100644 (file)
@@ -25,6 +25,8 @@ public class ApplicationProperties
 
   @NotNull
   private ConsumerProperties consumer;
+  @NotNull
+  private ProducerProperties producer;
 
 
   public ConsumerProperties getConsumerProperties()
@@ -32,6 +34,11 @@ public class ApplicationProperties
     return consumer;
   }
 
+  public ProducerProperties getProducerProperties()
+  {
+    return producer;
+  }
+
 
   @Validated
   @Getter
@@ -49,4 +56,24 @@ public class ApplicationProperties
 
     enum OffsetReset { latest, earliest, none }
   }
+
+  @Validated
+  @Getter
+  @Setter
+  static class ProducerProperties
+  {
+    @NotNull
+    @NotEmpty
+    private String topic;
+    @NotNull
+    @NotEmpty
+    private String acks;
+    @NotNull
+    private Integer batchSize;
+    @NotNull
+    private Integer lingerMs;
+    @NotNull
+    @NotEmpty
+    private String compressionType;
+  }
 }
index 6767c6b..da845bd 100644 (file)
@@ -2,18 +2,24 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Phaser;
 
 
 @Slf4j
-public class ExampleConsumer implements Runnable
+public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String id;
   private final String topic;
@@ -22,8 +28,14 @@ public class ExampleConsumer implements Runnable
   private final Runnable closeCallback;
 
   private final Map<String, Long> counterState = new HashMap<>();
+  private final String stateTopic;
+  private final Producer<String, Long> producer;
 
   private volatile boolean running = false;
+  private final Phaser phaser = new Phaser(1);
+  private volatile int[] seen;
+  private volatile int[] acked;
+  private volatile boolean[] done;
   private long consumed = 0;
 
 
@@ -31,11 +43,15 @@ public class ExampleConsumer implements Runnable
     String clientId,
     String topic,
     Consumer<String, String> consumer,
+    String stateTopic,
+    Producer<String, Long> producer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
+    this.stateTopic = stateTopic;
+    this.producer = producer;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -49,6 +65,13 @@ public class ExampleConsumer implements Runnable
   {
     try
     {
+      log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
+      int numPartitions = consumer.partitionsFor(topic).size();
+      log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+      seen = new int[numPartitions];
+      acked = new int[numPartitions];
+      done = new boolean[numPartitions];
+
       log.info("{} - Subscribing to topic {}", id, topic);
       consumer.subscribe(Arrays.asList(topic));
       running = true;
@@ -59,15 +82,29 @@ public class ExampleConsumer implements Runnable
             consumer.poll(Duration.ofSeconds(1));
 
         log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<String, String> record : records)
-        {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
-        }
+        records
+          .partitions()
+          .forEach(partition ->
+          {
+            seen[partition.partition()] = 0;
+            acked[partition.partition()] = 0;
+            done[partition.partition()] = false;
+
+            for (ConsumerRecord<String, String> record : records)
+            {
+              handleRecord(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                record.value());
+            }
+
+            done[partition.partition()] = true;
+          });
+
+        int phase = phaser.arriveAndAwaitAdvance();
+        log.info("{} - Phase {} is done!", id, phase);
       }
     }
     catch(WakeupException e)
@@ -100,6 +137,7 @@ public class ExampleConsumer implements Runnable
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
     Long counter = computeCount(key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
+    sendCounterState(partition, key, counter);
   }
 
   private synchronized Long computeCount(String key)
@@ -112,6 +150,41 @@ public class ExampleConsumer implements Runnable
     return counterState;
   }
 
+  void sendCounterState(int partition, String key, Long counter)
+  {
+    seen[partition]++;
+    ProducerRecord<String, Long> record = new ProducerRecord<>(stateTopic, key, counter);
+    producer.send(record, ((metadata, exception) ->
+    {
+      if (exception == null)
+      {
+        acked[partition]++;
+        if (done[partition] && !(acked[partition] < seen[partition]))
+        {
+          phaser.arrive();
+        }
+      }
+      else
+      {
+        // Errors are ignored (for now):
+        // The next occurrence of the key will issue a new update of the counter state
+        log.error("{} - {}", id, exception.toString());
+      }
+    }));
+  }
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    phaser.bulkRegister(partitions.size());
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(partition -> phaser.arriveAndDeregister());
+  }
+
   public void shutdown() throws InterruptedException
   {
     log.info("{} joining the worker-thread...", id);
index 7a06731..d9e7066 100644 (file)
@@ -6,6 +6,12 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
+  producer:
+    topic: state
+    acks: -1
+    batch-size: 16384
+    linger-ms: 0
+    compression-type: gzip
 management:
   endpoint:
     shutdown:
@@ -28,6 +34,12 @@ info:
       topic: ${juplo.consumer.topic}
       auto-offset-reset: ${juplo.consumer.auto-offset-reset}
       auto-commit-interval: ${juplo.consumer.auto-commit-interval}
+    producer:
+      topic: ${juplo.producer.topic}
+      acks: ${juplo.producer.acks}
+      batch-size: ${juplo.producer.batch-size}
+      linger-ms: ${juplo.producer.linger-ms}
+      compression-type: ${juplo.producer.compression-type}
 logging:
   level:
     root: INFO
index e4b97a4..22bb613 100644 (file)
@@ -9,8 +9,7 @@ import org.springframework.test.web.servlet.MockMvc;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static de.juplo.kafka.ApplicationTests.*;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
@@ -21,12 +20,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
     properties = {
         "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
         "spring.kafka.consumer.auto-offset-reset=earliest",
-        "juplo.consumer.topic=" + TOPIC })
+        "juplo.consumer.topic=" + TOPIC_IN})
 @AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
 public class ApplicationTests
 {
-  static final String TOPIC = "FOO";
+  static final String TOPIC_IN  = "FOO";
+  static final String TOPIC_OUT = "BAR";
   static final int PARTITIONS = 10;
 
   @Autowired