Rückbau auf einen Consumer, der in `onPartitionsRevoked()` immer committed
authorKai Moritz <kai@juplo.de>
Sun, 28 Aug 2022 13:59:54 +0000 (15:59 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 2 Sep 2022 03:31:49 +0000 (05:31 +0200)
* Entfernt wird hier das erweiterte Interface, für den Rebalance-Listener
  über den die Consumer-Implementierung die Commits für den Fehlerfall
  explizit deaktivieren kann.
* Die Staffelübergabe sollte damit weiterhin normal funktionieren. D.h.,
  solange der Consumer ordentlich heruntergefahren wird und nicht ein
  besonders hohes Nachrichten-Aufkommen angelegt wird.
* Vorführ-Skript so angepasst, dass deutlich wird, dass die
  "Staffelübergabe" nun funktioniert, wenn Consumer ordentlich gestopped
  werden, aber weiterhin Fehler auftreten, wenn ein Consumer
  außerordentlich beendet (hier: getötet) wird.

README.sh
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/RebalanceListener.java [deleted file]
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 6be4b11..81213c1 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -28,89 +28,78 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d gateway requests-1 requests-2 adder-1 adder-2
+docker-compose up -d gateway requests-1 requests-2
 
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
 while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done
 while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
 
 docker-compose up -d peter klaus
 
+docker-compose up -d adder-1
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
 while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
 http -v --pretty none -S :8091/results
 echo
-while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
-http -v --pretty none -S :8092/results
-echo
+
 sleep 3
 echo "Resultate für adder-1"
 http -v --pretty none -S :8091/results
 echo
+
 echo "Resultate für peter von adder-1"
 http :8091/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-1"
 http :8091/results/klaus | jq .[].sum | uniq
-echo "Resultate für adder-2"
+
+
+docker-compose up -d adder-2
+while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
+while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
 http -v --pretty none -S :8092/results
 echo
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
 
-docker-compose stop adder-1
-sleep 1
+sleep 3
 echo "Resultate für adder-2"
 http -v --pretty none -S :8092/results
 echo
+
+echo "Resultate für peter von adder-1"
+http :8091/results/peter | jq .[].sum | uniq
+echo "Resultate für klaus von adder-1"
+http :8091/results/klaus | jq .[].sum | uniq
+
 echo "Resultate für peter von adder-2"
 http :8092/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-2"
 http :8092/results/klaus | jq .[].sum | uniq
 
 docker-compose stop adder-2
-docker-compose start adder-1
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r '.[]|contains({peter})' | grep true)" != "true" ]]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r '.[]|contains({klaus})' | grep true)" != "true" ]]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
+
 echo "Resultate für adder-1"
 http -v --pretty none -S :8091/results
 echo
+
 echo "Resultate für peter von adder-1"
 http :8091/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-1"
 http :8091/results/klaus | jq .[].sum | uniq
 
-docker-compose start adder-2
-while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done
-while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done
+docker-compose kill -s 9 adder-1
+docker-compose start adder-1
+while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r '.[]|contains({peter})' | grep true)" != "true" ]]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done
+while [[ "$(http :8091/results | jq -r '.[]|contains({klaus})' | grep true)" != "true" ]]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done
+
 echo "Resultate für adder-1"
 http -v --pretty none -S :8091/results
 echo
-echo "Resultate für adder-2"
-http -v --pretty none -S :8092/results
-echo
+
 echo "Resultate für peter von adder-1"
 http :8091/results/peter | jq .[].sum | uniq
 echo "Resultate für klaus von adder-1"
 http :8091/results/klaus | jq .[].sum | uniq
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
-
-docker-compose kill -s 9 adder-1
 
-docker-compose start adder-1
-while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done
-while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done
 docker-compose kill -s 9 peter klaus
-echo "Resultate für peter von adder-1"
-http :8091/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-1"
-http :8091/results/klaus | jq .[].sum | uniq
-echo "Resultate für peter von adder-2"
-http :8092/results/peter | jq .[].sum | uniq
-echo "Resultate für klaus von adder-2"
-http :8092/results/klaus | jq .[].sum | uniq
index 63d57df..eef0d00 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.common.TopicPartition;
 
 import java.util.*;
@@ -10,7 +11,7 @@ import java.util.*;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRebalanceListener implements RebalanceListener
+public class ApplicationRebalanceListener implements ConsumerRebalanceListener
 {
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults adderResults;
@@ -20,8 +21,6 @@ public class ApplicationRebalanceListener implements RebalanceListener
 
   private final Set<Integer> partitions = new HashSet<>();
 
-  private boolean commitsEnabled = true;
-
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
@@ -51,17 +50,14 @@ public class ApplicationRebalanceListener implements RebalanceListener
   @Override
   public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    if (commitsEnabled)
+    log.info("{} - Commiting offsets for all previously assigned partitions", id);
+    try
     {
-      log.info("{} - Commiting offsets for all previously assigned partitions", id);
-      try
-      {
-        consumer.commitSync();
-      }
-      catch (Exception e)
-      {
-        log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
-      }
+      consumer.commitSync();
+    }
+    catch (Exception e)
+    {
+      log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
     }
 
     partitions.forEach(tp ->
@@ -83,16 +79,4 @@ public class ApplicationRebalanceListener implements RebalanceListener
       stateRepository.save(new StateDocument(partition, state, results));
     });
   }
-
-  @Override
-  public void enableCommits()
-  {
-    commitsEnabled = true;
-  }
-
-  @Override
-  public void disableCommits()
-  {
-    commitsEnabled = false;
-  }
 }
index 3ff479c..f0e74d3 100644 (file)
@@ -25,7 +25,7 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final RebalanceListener rebalanceListener;
+  private final ConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
@@ -42,7 +42,6 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      rebalanceListener.enableCommits();
       consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
@@ -90,8 +89,7 @@ public class EndlessConsumer<K, V> implements Runnable
     }
     catch(Exception e)
     {
-      log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
-      rebalanceListener.disableCommits();
+      log.error("{} - Unexpected error: {}", id, e.toString(), e);
       shutdown(e);
     }
     finally
diff --git a/src/main/java/de/juplo/kafka/RebalanceListener.java b/src/main/java/de/juplo/kafka/RebalanceListener.java
deleted file mode 100644 (file)
index 26f97aa..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-
-
-public interface RebalanceListener extends ConsumerRebalanceListener
-{
-  void enableCommits();
-  void disableCommits();
-}
index 8124c81..8849317 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -66,7 +67,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        MongoProperties mongoProperties;
        @Autowired
-       RebalanceListener rebalanceListener;
+       ConsumerRebalanceListener rebalanceListener;
        @Autowired
        RecordHandler<K, V> recordHandler;