Log-Compaction: Der Zustand wird in einem Topic gespeichert consumer/spring-consumer--log-compaction--generics4all consumer/spring-consumer--log-compaction--generics4some
authorKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 10:41:54 +0000 (11:41 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 11:08:35 +0000 (12:08 +0100)
* Die Counter werden mit dem ``String``-Key indiziert
** Vorbereitung auf eine Poison-Pill Übung ist hier noch out-of-scope...
* GET-Endpoint zum Abfragen der Schlüssel-Zählungen ergänzt
** Zugriff auf die `counterMap` in `ExampleConsumer` synchronisiert.
** `CounterStateController` kopiert die Map, um mögliche konkurierende
   Zugriffe während des Erzeugens der Ausgabe zu vermeiden.
* Der Zustand des Zählers wird in einem compacted Topic abgelegt
** Der Consumer zählt, welche Nachrichten gesendet und welche bestätigt
   wurden.
** Über einen `Phaser` wird sichergestellt, dass alle Nachrichten von den
   zuständigen Brokern bestätigt wurden, bevor der nächste ``poll()``-Aufruf
   erfolgt.
* Der Value-Typ in dem Topic `state` ist jetzt auch vom Typ `String`
** Dadurch wird die Kontrolle der Ergebnisse einfacher, da alle Nachrichten
   auch einfach mit `kafkacat` gelesen werden können.
* Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden
** Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten
   Nachrichten als `acked` gezählt werden.
** Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler
   hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als
   "bestätigt" gezählt würden.
** Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen.
* Log-Meldungen zum Fortschritt beim Versenden des Zähler-Status ergänzt
* Log-Meldungen für das Senden des Zählerstands ergänzt
* Fix: Der Rebalance-Listener wurde nie registriert
* Fehler im Logging der aktiven Phase korrigiert und Meldungen verbessert
* Fix: Nachrichten wurden ggf. doppelt verarbeitet
** Wenn man in einer Schliefe die Nachrichten pro Partition separat
   verarbeitet...
** ...dann sollte man in jedem Schleifendurchlauf auch nur die Nachrichten
*   der gerade zu verarbeitenden Partition abrufen!
* Fix: `poll()` liefert nicht immer Nachrichten zu _allen_ Partitionen
** Ein Aufruf von `poll()` liefert _nicht unbedingt_ Nachrichten zu _jeder_
   Partition, die der Instanz gerade zugeteilt ist.
** Daher konnte es auftreten, dass eine Phase nie beendet wurde, wenn
   `poll()` nur Nachrichten zu einer Untermenge der aktiven Partitionen
   geliefert hat.
* Der Zustand wird aus dem ``state``-Topic wiederhergestellt
* Refactor: Logik für Counter in Klasse `CounterState` extrahiert
* Refactor: DRY für state-change zu ASSIGNED
* Refactor: DRY für state-change zu UNASSIGNED
* Refactor: Neue, klarere ``switch``-Syntax
* DRY für state-change zu RESTORING
* Refactor: Handling von pause/resume vollständig in State-Change-Methoden
* Fix & Refactor: Restore-Behandlung wurde _allen_ aktiven Partitionen zuteil
** Durch das vorausgehende Refactoring wurde deutlich, dass die Behandlung,
   die den _neu_ hinzugefügten Partitionen zugedacht war, allen in
   `assignedPartitions` vermerkten Partitionen wiederfahren ist.
** Dies ist für den aktuellen Entwicklungsstand ggf. egal, da der wegen dem
   Co-Partitioning (noch!) benötigte `RangeAssignor` eh _zuerst alle_
   Partitionen entzieht, bevor er _dann alle_ neu zuteilt.
** Da der Code aber auch mit dem neuen Consumer-Rebalance Protokoll
   funktionieren muss, wurde das Refactoring hier fortgeführt und so
   vollendet, dass nun _alle_ Aktionenen _nur noch_ von den Callbacks
   `onPartitionsAssigned()` und `onPartitionsRevoked()` ausgeht.
* Der Zählerzustand wird separat pro Partition verwaltet
** Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
** D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
   die angefragte Instanz gerade verfügt.
* Refactor: Zustand muss `CounterState` vollständig übergeben werden
* Refactor: Enum `PartitionState` in `State` umbenannt
* Effekte des Log-Compaction in dem Topic `state` sichtbar gemacht
* Setup mit ein bischen mehr Dampf (`README.sh` angepasst)
* TX-kompatibler Weg zur Prüfung auf eine abgeschlossene Wiederherstellung
** Der bisher verwendete Vergleich der Offset-Positionen schlägt fehl, wenn
   die Implementierung um Transaktionen erweitert wird
** _Grund:_ Dann stimmt die Offset-Position nicht mehr überein, weil nach
   der letzten Zustands-Nachricht noch eine, von der Transaktion erzeugte,
   versteckte Nachricht folgt, die die Anwendung nie zu sehen bekommt!
* Mögliche Exception wegen konkurrierendem Zugriff auf Map verhindert
* Rückbau auf einfachen Consumer mit Statistiken zur Nachrichtenzählung,
  um an dem Beispiel die Verwendung des Rebalance-Listeners vorführen zu
  können. (D.h., das Speichern/Wiederherstellen des Zustands wurde
  entfernt.)
* Umkehrung des Rückbaus, so dass der Zustand wieder in einem Topic mit
  Log-Compaction gespeichert und aus diesem Wiederhergestellt wird.
* Die bisherige Implementierung notdürftig an die Typisierung angeapsst.

README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index bdefd2b..07f7de4 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf consumer-1 consumer-2
+docker compose -f docker/docker-compose.yml rm -svf consumer-1
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -28,12 +28,8 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
 docker compose -f docker/docker-compose.yml up -d producer
 docker compose -f docker/docker-compose.yml up -d consumer-1
-sleep 10
-docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
 
-docker compose -f docker/docker-compose.yml up -d consumer-2
 sleep 10
-docker compose -f docker/docker-compose.yml exec cli http -v consumer-1:8881/
-docker compose -f docker/docker-compose.yml exec cli http -v consumer-2:8881/
+docker compose -f docker/docker-compose.yml exec cli http consumer-1:8881/
 
 docker compose -f docker/docker-compose.yml stop producer consumer-1
index 6d6225d..d58151a 100644 (file)
@@ -95,11 +95,17 @@ services:
           echo -n Bereits konfiguriert: 
           cat INITIALIZED
           kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+          kafka-topics --bootstrap-server kafka:9092 --describe --topic state
         else
           kafka-topics --bootstrap-server kafka:9092 \
                        --delete \
                        --if-exists \
                        --topic test
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --delete \
+                       --if-exists \
+                       --topic state \
+
           kafka-topics --bootstrap-server kafka:9092 \
                        --create \
                        --topic test \
@@ -108,7 +114,20 @@ services:
                        --config min.insync.replicas=2 \
           && echo Das Topic \'test\' wurde erfolgreich angelegt: \
           && kafka-topics --bootstrap-server kafka:9092 --describe --topic test \
-          && date > INITIALIZED
+          && \
+          kafka-topics --bootstrap-server kafka:9092 \
+                       --create \
+                       --topic state \
+                       --partitions 2 \
+                       --replication-factor 3 \
+                       --config min.insync.replicas=2 \
+                       --config cleanup.policy=compact \
+                       --config segment.ms=3000 \
+                       --config max.compaction.lag.ms=5000 \
+          && echo Das Topic \'state\' wurde erfolgreich angelegt: \
+          && kafka-topics --bootstrap-server kafka:9092 --describe --topic state \
+          && \
+          date > INITIALIZED
         fi
     stop_grace_period: 0s
     depends_on:
@@ -145,7 +164,7 @@ services:
       juplo.producer.throttle-ms: 10
 
   consumer:
-    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
+    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: consumer
@@ -154,7 +173,7 @@ services:
       logging.level.de.juplo: TRACE
 
   peter:
-    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
+    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: peter
@@ -163,7 +182,7 @@ services:
       logging.level.de.juplo: TRACE
 
   ute:
-    image: juplo/spring-consumer:1.1-rebalance-listener-SNAPSHOT
+    image: juplo/spring-consumer:1.1-log-compaction-SNAPSHOT
     environment:
       juplo.bootstrap-server: kafka:9092
       juplo.client-id: ute
diff --git a/pom.xml b/pom.xml
index daea167..4d29574 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-rebalance-listener-SNAPSHOT</version>
+  <version>1.1-log-compaction-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index d2b8e05..ac5431a 100644 (file)
@@ -2,8 +2,11 @@ package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
@@ -19,6 +22,7 @@ public class ApplicationConfiguration
   @Bean
   public ExampleConsumer<String, String> exampleConsumer(
     Consumer<String, String> kafkaConsumer,
+    Producer<String, String> kafkaProducer,
     ApplicationProperties properties,
     ConfigurableApplicationContext applicationContext)
   {
@@ -27,6 +31,8 @@ public class ApplicationConfiguration
         properties.getClientId(),
         properties.getConsumerProperties().getTopic(),
         kafkaConsumer,
+        properties.getProducerProperties().getTopic(),
+        kafkaProducer,
         () -> applicationContext.close());
   }
 
@@ -46,10 +52,29 @@ public class ApplicationConfiguration
       props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
     }
     props.put("metadata.maxage.ms", 5000); //  5 Sekunden
-    props.put("partition.assignment.strategy", StickyAssignor.class.getName());
+    props.put("partition.assignment.strategy", RangeAssignor.class.getName());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
     return new KafkaConsumer<>(props);
   }
+
+  @Bean
+  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("client.id", properties.getClientId());
+    props.put("acks", properties.getProducerProperties().getAcks());
+    props.put("batch.size", properties.getProducerProperties().getBatchSize());
+    props.put("metadata.maxage.ms",   5000); //  5 Sekunden
+    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
+    props.put("request.timeout.ms",  10000); // 10 Sekunden
+    props.put("linger.ms", properties.getProducerProperties().getLingerMs());
+    props.put("compression.type", properties.getProducerProperties().getCompressionType());
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+
+    return new KafkaProducer<>(props);
+  }
 }
index c8193c9..0b43159 100644 (file)
@@ -25,6 +25,8 @@ public class ApplicationProperties
 
   @NotNull
   private ConsumerProperties consumer;
+  @NotNull
+  private ProducerProperties producer;
 
 
   public ConsumerProperties getConsumerProperties()
@@ -32,6 +34,11 @@ public class ApplicationProperties
     return consumer;
   }
 
+  public ProducerProperties getProducerProperties()
+  {
+    return producer;
+  }
+
 
   @Validated
   @Getter
@@ -49,4 +56,24 @@ public class ApplicationProperties
 
     enum OffsetReset { latest, earliest, none }
   }
+
+  @Validated
+  @Getter
+  @Setter
+  static class ProducerProperties
+  {
+    @NotNull
+    @NotEmpty
+    private String topic;
+    @NotNull
+    @NotEmpty
+    private String acks;
+    @NotNull
+    private Integer batchSize;
+    @NotNull
+    private Integer lingerMs;
+    @NotNull
+    @NotEmpty
+    private String compressionType;
+  }
 }
index 244ac82..46612a6 100644 (file)
@@ -5,11 +5,14 @@ import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.*;
+import java.util.concurrent.Phaser;
 
 
 @Slf4j
@@ -21,9 +24,19 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
   private final Thread workerThread;
   private final Runnable closeCallback;
 
+  private final String stateTopic;
+  private final Producer<K, String> producer;
+
   private volatile boolean running = false;
+  private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
+  private volatile State[] partitionStates;
+  private Map<K, Long>[] restoredState;
   private CounterState<K>[] counterState;
+  private volatile long[] stateEndOffsets;
+  private volatile int[] seen;
+  private volatile int[] acked;
+  private volatile boolean[] done;
   private long consumed = 0;
 
 
@@ -31,11 +44,15 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
     String clientId,
     String topic,
     Consumer<K, V> consumer,
+    String stateTopic,
+    Producer<K, String> producer,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
+    this.stateTopic = stateTopic;
+    this.producer = producer;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -52,26 +69,68 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
       log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
       int numPartitions = consumer.partitionsFor(topic).size();
       log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+      partitionStates = new State[numPartitions];
+      for (int i=0; i<numPartitions; i++)
+      {
+        partitionStates[i] = State.UNASSIGNED;
+      }
+      restoredState = new Map[numPartitions];
       counterState = new CounterState[numPartitions];
+      stateEndOffsets = new long[numPartitions];
+      seen = new int[numPartitions];
+      acked = new int[numPartitions];
+      done = new boolean[numPartitions];
 
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), this);
+      consumer.subscribe(Arrays.asList(topic, stateTopic), this);
       running = true;
 
       while (running)
       {
         ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
 
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<K, V> record : records)
-        {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
-        }
+        int phase = phaser.getPhase();
+
+        assignedPartitions
+          .forEach(partition ->
+          {
+            seen[partition.partition()] = 0;
+            acked[partition.partition()] = 0;
+            done[partition.partition()] = false;
+          });
+
+        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
+        records
+          .partitions()
+          .forEach(partition ->
+          {
+            for (ConsumerRecord<K, V> record : records.records(partition))
+            {
+              handleRecord(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                record.value());
+            }
+
+            checkRestoreProgress(partition);
+
+            done[partition.partition()] = true;
+          });
+
+        assignedPartitions
+          .forEach(partition ->
+          {
+            if (seen[partition.partition()] == 0)
+            {
+              int arrivedPhase = phaser.arrive();
+              log.debug("{} - Received no records for partition {} in phase {}", id, partition, arrivedPhase);
+            }
+          });
+
+        int arrivedPhase = phaser.arriveAndAwaitAdvance();
+        log.info("{} - Phase {} is done! Next phase: {}", id, phase, arrivedPhase);
       }
     }
     catch(WakeupException e)
@@ -103,8 +162,59 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
 
+    if (topic.equals(this.topic))
+    {
+      handleMessage(partition, key);
+    }
+    else
+    {
+      handleState(partition, key, value);
+    }
+  }
+
+  private void checkRestoreProgress(TopicPartition topicPartition)
+  {
+    int partition = topicPartition.partition();
+
+    if (partitionStates[partition] == State.RESTORING)
+    {
+      long consumerPosition = consumer.position(topicPartition);
+
+      if (consumerPosition + 1 >= stateEndOffsets[partition])
+      {
+        log.info(
+          "{} - Position of consumer is {}. Restoring of state for partition {} done!",
+          id,
+          consumerPosition,
+          topicPartition);
+        stateAssigned(partition);
+      }
+      else
+      {
+        log.debug(
+          "{} - Restored state up to offset {}, end-offset: {}",
+          id,
+          consumerPosition,
+          stateEndOffsets[partition]);
+      }
+    }
+  }
+
+  private synchronized void handleState(
+    int partition,
+    K key,
+    V value)
+  {
+    restoredState[partition].put(key, Long.parseLong(value.toString())); // << An Typisierung anpassen!
+  }
+
+  private void handleMessage(
+    Integer partition,
+    K key)
+  {
     Long counter = computeCount(partition, key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
+    sendCounterState(partition, key, counter);
   }
 
   private synchronized Long computeCount(int partition, K key)
@@ -119,6 +229,82 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
     return result;
   }
 
+  void sendCounterState(int partition, K key, Long counter)
+  {
+    seen[partition]++;
+
+    final long time = System.currentTimeMillis();
+
+    final ProducerRecord<K, V> record = new ProducerRecord<>(
+      stateTopic,        // Topic
+      key,               // Key
+      (V)counter.toString() // Value << TODO: An Typisierung anpassen
+    );
+
+    producer.send(record, (metadata, e) ->
+    {
+      long now = System.currentTimeMillis();
+      if (e == null)
+      {
+        // HANDLE SUCCESS
+        log.debug(
+          "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
+          id,
+          record.key(),
+          record.value(),
+          metadata.partition(),
+          metadata.offset(),
+          metadata.timestamp(),
+          now - time
+        );
+      }
+      else
+      {
+        // HANDLE ERROR
+        log.error(
+          "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
+          id,
+          record.key(),
+          record.value(),
+          metadata == null ? -1 : metadata.timestamp(),
+          now - time,
+          e.toString()
+        );
+      }
+
+      acked[partition]++;
+      if (done[partition] && !(acked[partition] < seen[partition]))
+      {
+        int arrivedPhase = phaser.arrive();
+        log.debug(
+          "{} - Arrived at phase {} for partition {}, seen={}, acked={}",
+          id,
+          arrivedPhase,
+          partition,
+          seen[partition],
+          acked[partition]);
+      }
+      else
+      {
+        log.debug(
+          "{} - Still in phase {} for partition {}, seen={}, acked={}",
+          id,
+          phaser.getPhase(),
+          partition,
+          seen[partition],
+          acked[partition]);
+      }
+    });
+
+    long now = System.currentTimeMillis();
+    log.trace(
+      "{} - Queued message {}={}, latency={}ms",
+      id,
+      record.key(),
+      record.value(),
+      now - time
+    );
+  }
 
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
@@ -126,11 +312,7 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
-      .forEach(partition ->
-      {
-        assignedPartitions.add(partition);
-        counterState[partition.partition()] = new CounterState<>(new HashMap<>());
-      });
+      .forEach(partition -> restoreAndAssign(partition.partition()));
   }
 
   @Override
@@ -139,11 +321,132 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
     partitions
       .stream()
       .filter(partition -> partition.topic().equals(topic))
-      .forEach(partition ->
-      {
-        assignedPartitions.remove(partition);
-        counterState[partition.partition()] = null;
-      });
+      .forEach(partition -> revoke(partition.partition()));
+  }
+
+  private void restoreAndAssign(int partition)
+  {
+    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+
+    long stateEndOffset = consumer
+      .endOffsets(List.of(statePartition))
+      .get(statePartition)
+      .longValue();
+
+    long stateBeginningOffset = consumer
+      .beginningOffsets(List.of(statePartition))
+      .get(statePartition);
+
+    log.info(
+      "{} - Found beginning-offset {} and end-offset {} for state partition {}",
+      id,
+      stateBeginningOffset,
+      stateEndOffset,
+      partition);
+
+    if (stateBeginningOffset < stateEndOffset)
+    {
+      stateRestoring(partition, stateBeginningOffset, stateEndOffset);
+    }
+    else
+    {
+      log.info("{} - No state available for partition {}", id, partition);
+      restoredState[partition] = new HashMap<>();
+      stateAssigned(partition);
+    }
+  }
+
+  private void revoke(int partition)
+  {
+    State partitionState = partitionStates[partition];
+    switch (partitionState)
+    {
+      case RESTORING, ASSIGNED -> stateUnassigned(partition);
+      case UNASSIGNED -> log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
+    }
+  }
+
+  private void stateRestoring(int partition, long stateBeginningOffset, long stateEndOffset)
+  {
+    log.info(
+      "{} - Changing partition-state for {}: {} -> RESTORING",
+      id,
+      partition,
+      partitionStates[partition]);
+    partitionStates[partition] = State.RESTORING;
+
+    TopicPartition messagePartition = new TopicPartition(this.topic, partition);
+    log.info("{} - Pausing message partition {}", id, messagePartition);
+    consumer.pause(List.of(messagePartition));
+
+    TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+    log.info(
+      "{} - Seeking to first offset {} for state partition {}",
+      id,
+      stateBeginningOffset,
+      statePartition);
+    consumer.seek(statePartition, stateBeginningOffset);
+    stateEndOffsets[partition] = stateEndOffset;
+    restoredState[partition] = new HashMap<>();
+    log.info("{} - Resuming state partition {}", id, statePartition);
+    consumer.resume(List.of(statePartition));
+  }
+
+  private void stateAssigned(int partition)
+  {
+    log.info(
+      "{} - State-change for partition {}: {} -> ASSIGNED",
+      id,
+      partition,
+      partitionStates[partition]);
+
+    partitionStates[partition] = State.ASSIGNED;
+
+    TopicPartition statePartition = new TopicPartition(stateTopic, partition);
+    log.info("{} - Pausing state partition {}...", id, statePartition);
+    consumer.pause(List.of(statePartition));
+    counterState[partition] = new CounterState(restoredState[partition]);
+    restoredState[partition] = null;
+
+    TopicPartition messagePartition = new TopicPartition(topic, partition);
+    log.info("{} - Adding partition {} to the assigned partitions", id, messagePartition);
+    assignedPartitions.add(messagePartition);
+    phaser.register();
+    log.info(
+      "{} - Registered new party for newly assigned partition {}. New total number of parties: {}",
+      id,
+      messagePartition,
+      phaser.getRegisteredParties());
+    log.info("{} - Resuming message partition {}...", id, messagePartition);
+    consumer.resume(List.of(messagePartition));
+  }
+
+  private void stateUnassigned(int partition)
+  {
+    State oldPartitionState = partitionStates[partition];
+
+    log.info(
+      "{} - State-change for partition {}: {} -> UNASSIGNED",
+      id,
+      partition,
+      oldPartitionState);
+
+    partitionStates[partition] = State.UNASSIGNED;
+
+    if (oldPartitionState == State.ASSIGNED)
+    {
+      TopicPartition messagePartition = new TopicPartition(topic, partition);
+      log.info("{} - Revoking partition {}", id, messagePartition);
+      assignedPartitions.remove(messagePartition);
+      counterState[partition] = null;
+
+      phaser.arriveAndDeregister();
+      log.info(
+        "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
+        id,
+        messagePartition,
+        phaser.getRegisteredParties());
+    }
   }
 
 
@@ -154,4 +457,11 @@ public class ExampleConsumer<K, V> implements Runnable, ConsumerRebalanceListene
     consumer.wakeup();
     workerThread.join();
   }
+
+  enum State
+  {
+    UNASSIGNED,
+    RESTORING,
+    ASSIGNED
+  }
 }
index 7a06731..d9e7066 100644 (file)
@@ -6,6 +6,12 @@ juplo:
     topic: test
     auto-offset-reset: earliest
     auto-commit-interval: 5s
+  producer:
+    topic: state
+    acks: -1
+    batch-size: 16384
+    linger-ms: 0
+    compression-type: gzip
 management:
   endpoint:
     shutdown:
@@ -28,6 +34,12 @@ info:
       topic: ${juplo.consumer.topic}
       auto-offset-reset: ${juplo.consumer.auto-offset-reset}
       auto-commit-interval: ${juplo.consumer.auto-commit-interval}
+    producer:
+      topic: ${juplo.producer.topic}
+      acks: ${juplo.producer.acks}
+      batch-size: ${juplo.producer.batch-size}
+      linger-ms: ${juplo.producer.linger-ms}
+      compression-type: ${juplo.producer.compression-type}
 logging:
   level:
     root: INFO
index b427efd..e8a9a07 100644 (file)
@@ -9,8 +9,7 @@ import org.springframework.test.web.servlet.MockMvc;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static de.juplo.kafka.ApplicationTests.*;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
@@ -21,12 +20,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
     "spring.kafka.consumer.auto-offset-reset=earliest",
-    "juplo.consumer.topic=" + TOPIC })
+    "juplo.consumer.topic=" + TOPIC_IN})
 @AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
 public class ApplicationTests
 {
-  static final String TOPIC = "FOO";
+  static final String TOPIC_IN  = "FOO";
+  static final String TOPIC_OUT = "BAR";
   static final int PARTITIONS = 10;
 
   @Autowired