Benennung vereinheitlicht und projektunabhängig gemacht rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 08:33:09 +0000 (10:33 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 09:07:25 +0000 (11:07 +0200)
* Merge branch 'counting-consumer' into rebalance-listener
* Außerdem die dort nicht vorhandene Klasse `KeyCountingRebalanceListener`
  entsprechend umbenannt.

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationTests.java

index 13176d2..6dbc9d9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,6 +9,7 @@ then
   exit
 fi
 
+docker-compose stop producer consumer
 docker-compose up -d zookeeper kafka cli
 
 if [[
@@ -16,7 +17,8 @@ if [[
   "$1" = "build"
 ]]
 then
-  mvn install || exit
+  docker-compose rm -svf consumer
+  mvn clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
@@ -24,7 +26,6 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
 
 docker-compose exec -T cli bash << 'EOF'
 echo "Creating topic with 3 partitions..."
@@ -35,17 +36,12 @@ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
 kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 EOF
 
-docker-compose up -d consumer
+docker-compose up -d producer consumer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
+while [[ "$(http :8081/state | jq -r .)" == "{}" ]]; do echo "Waiting for some state to show up..."; done
 
-docker-compose up -d producer
-sleep 10
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+http -v :8081/state
 
 docker-compose stop producer
 docker-compose exec -T cli bash << 'EOF'
@@ -57,32 +53,33 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 EOF
 
 docker-compose start producer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+
 sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
-docker-compose stop producer consumer
+http -v :8081/state
+
+docker-compose stop producer
index df6b321..f13e04d 100644 (file)
@@ -24,14 +24,6 @@ services:
     depends_on:
       - zookeeper
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
-    environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
-
   cli:
     image: juplo/toolbox
     command: sleep infinity
index 7a0a8ad..e9c26fd 100644 (file)
@@ -17,18 +17,18 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public KeyCountingRecordHandler keyCountingRecordHandler()
+  public ApplicationRecordHandler recordHandler()
   {
-    return new KeyCountingRecordHandler();
+    return new ApplicationRecordHandler();
   }
 
   @Bean
-  public KeyCountingRebalanceListener keyCountingRebalanceListener(
-      KeyCountingRecordHandler keyCountingRecordHandler,
+  public ApplicationRebalanceListener rebalanceListener(
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
-    return new KeyCountingRebalanceListener(
-        keyCountingRecordHandler,
+    return new ApplicationRebalanceListener(
+        recordHandler,
         properties.getClientId());
   }
 
@@ -36,8 +36,8 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, Long> endlessConsumer(
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      KeyCountingRebalanceListener keyCountingRebalanceListener,
-      KeyCountingRecordHandler keyCountingRecordHandler,
+      ApplicationRebalanceListener rebalanceListener,
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -46,8 +46,8 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            keyCountingRebalanceListener,
-            keyCountingRecordHandler);
+            rebalanceListener,
+            recordHandler);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
new file mode 100644 (file)
index 0000000..0dcadce
--- /dev/null
@@ -0,0 +1,50 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements ConsumerRebalanceListener
+{
+  private final ApplicationRecordHandler recordHandler;
+  private final String id;
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      log.info("{} - adding partition: {}", id, partition);
+      recordHandler.addPartition(partition, new HashMap<>());
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      log.info("{} - removing partition: {}", id, partition);
+      Map<String, Long> removed = recordHandler.removePartition(partition);
+      for (String key : removed.keySet())
+      {
+        log.info(
+            "{} - Seen {} messages for partition={}|key={}",
+            id,
+            removed.get(key),
+            partition,
+            key);
+      }
+    });
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
new file mode 100644 (file)
index 0000000..dfbf82e
--- /dev/null
@@ -0,0 +1,46 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
+{
+  private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+
+
+  @Override
+  public void accept(ConsumerRecord<String, Long> record)
+  {
+    Integer partition = record.partition();
+    String key = record.key() == null ? "NULL" : record.key().toString();
+    Map<String, Long> byKey = state.get(partition);
+
+    if (!byKey.containsKey(key))
+      byKey.put(key, 0l);
+
+    long seenByKey = byKey.get(key);
+    seenByKey++;
+    byKey.put(key, seenByKey);
+  }
+
+  public void addPartition(Integer partition, Map<String, Long> statistics)
+  {
+    state.put(partition, statistics);
+  }
+
+  public Map<String, Long> removePartition(Integer partition)
+  {
+    return state.remove(partition);
+  }
+
+
+  public Map<Integer, Map<String, Long>> getState()
+  {
+    return state;
+  }
+}
index f6ff47f..09fb762 100644 (file)
@@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException;
 public class DriverController
 {
   private final EndlessConsumer consumer;
-  private final KeyCountingRecordHandler keyCountingRecordHandler;
+  private final ApplicationRecordHandler recordHandler;
 
 
   @PostMapping("start")
@@ -29,10 +29,10 @@ public class DriverController
   }
 
 
-  @GetMapping("seen")
-  public Map<Integer, Map<String, Long>> seen()
+  @GetMapping("state")
+  public Map<Integer, Map<String, Long>> state()
   {
-    return keyCountingRecordHandler.getSeen();
+    return recordHandler.getState();
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
deleted file mode 100644 (file)
index 0ad1f31..0000000
+++ /dev/null
@@ -1,50 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements ConsumerRebalanceListener
-{
-  private final KeyCountingRecordHandler handler;
-  private final String id;
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - adding partition: {}", id, partition);
-      handler.addPartition(partition, new HashMap<>());
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - removing partition: {}", id, partition);
-      Map<String, Long> removed = handler.removePartition(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
-    });
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java
deleted file mode 100644 (file)
index 099dcf7..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
-  @Override
-  public void accept(ConsumerRecord<String, Long> record)
-  {
-    Integer partition = record.partition();
-    String key = record.key() == null ? "NULL" : record.key().toString();
-    Map<String, Long> byKey = seen.get(partition);
-
-    if (!byKey.containsKey(key))
-      byKey.put(key, 0l);
-
-    long seenByKey = byKey.get(key);
-    seenByKey++;
-    byKey.put(key, seenByKey);
-  }
-
-  public void addPartition(Integer partition, Map<String, Long> statistics)
-  {
-    seen.put(partition, statistics);
-  }
-
-  public Map<String, Long> removePartition(Integer partition)
-  {
-    return seen.remove(partition);
-  }
-
-
-  public Map<Integer, Map<String, Long>> getSeen()
-  {
-    return seen;
-  }
-}
index 5b13b7d..d7eb039 100644 (file)
@@ -65,9 +65,9 @@ class ApplicationTests
        @Autowired
        ExecutorService executor;
        @Autowired
-       KeyCountingRebalanceListener keyCountingRebalanceListener;
+       ApplicationRebalanceListener rebalanceListener;
        @Autowired
-       KeyCountingRecordHandler keyCountingRecordHandler;
+       ApplicationRecordHandler recordHandler;
 
        EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
@@ -270,7 +270,7 @@ class ApplicationTests
                });
 
                TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
-                               new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+                               new TestRecordHandler<String, Long>(recordHandler) {
                                        @Override
                                        public void onNewRecord(ConsumerRecord<String, Long> record)
                                        {
@@ -287,7 +287,7 @@ class ApplicationTests
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                kafkaConsumer,
-                                               keyCountingRebalanceListener,
+                                               rebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
 
                endlessConsumer.start();