Merge branch 'endless-stream-consumer' into rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 20:15:34 +0000 (22:15 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 20:15:34 +0000 (22:15 +0200)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 900270a..13176d2 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -24,8 +24,65 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up setup
-docker-compose up -d producer consumer
-sleep 15
+docker-compose up -d kafka-ui
+
+docker-compose exec -T cli bash << 'EOF'
+echo "Creating topic with 3 partitions..."
+kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+# tag::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
+# end::createtopic[]
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker-compose up -d consumer
+
+docker-compose up -d producer
+sleep 10
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+
+docker-compose stop producer
+docker-compose exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 3 to 7..."
+# tag::altertopic[]
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+# end::altertopic[]
+EOF
+
+docker-compose start producer
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
+sleep 1
+http -v :8081/seen
 docker-compose stop producer consumer
-docker-compose logs consumer
index b03bb1e..1392ae2 100644 (file)
@@ -24,13 +24,13 @@ services:
     depends_on:
       - zookeeper
 
-  setup:
-    image: juplo/toolbox
-    command: >
-      bash -c "
-        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
-      "
+  kafka-ui:
+    image: provectuslabs/kafka-ui:0.3.3
+    ports:
+      - 8080:8080
+    environment:
+      KAFKA_CLUSTERS_0_NAME: local
+      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
 
   cli:
     image: juplo/toolbox
@@ -44,7 +44,7 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 200
+      producer.throttle-ms: 10
 
 
   consumer:
diff --git a/pom.xml b/pom.xml
index cf68030..8c5dccc 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -14,7 +14,7 @@
   <groupId>de.juplo.kafka</groupId>
   <artifactId>endless-consumer</artifactId>
   <version>1.0-SNAPSHOT</version>
-  <name>Endless Consumer: a Simple Consumer-Group that reads and print the topic</name>
+  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
 
   <dependencies>
     <dependency>
diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
new file mode 100644 (file)
index 0000000..ab9782c
--- /dev/null
@@ -0,0 +1,32 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.HealthIndicator;
+import org.springframework.stereotype.Component;
+
+
+@Component
+@RequiredArgsConstructor
+public class ApplicationHealthIndicator implements HealthIndicator
+{
+  private final EndlessConsumer consumer;
+
+
+  @Override
+  public Health health()
+  {
+    try
+    {
+      return consumer
+          .exitStatus()
+          .map(Health::down)
+          .orElse(Health.outOfService())
+          .build();
+    }
+    catch (IllegalStateException e)
+    {
+      return Health.up().build();
+    }
+  }
+}
index a02fd2c..1525f5a 100644 (file)
@@ -3,10 +3,12 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.ExceptionHandler;
+import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 
@@ -29,6 +31,12 @@ public class DriverController
     consumer.stop();
   }
 
+  @GetMapping("seen")
+  public Map<Integer, Map<String, Integer>> seen()
+  {
+    return consumer.getSeen();
+  }
+
   @ExceptionHandler
   @ResponseStatus(HttpStatus.BAD_REQUEST)
   public ErrorResponse illegalStateException(IllegalStateException e)
index adebff1..c2d4447 100644 (file)
@@ -1,17 +1,17 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 
 import javax.annotation.PreDestroy;
 import java.time.Duration;
-import java.util.Arrays;
-import java.util.Optional;
-import java.util.Properties;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Condition;
@@ -37,6 +37,9 @@ public class EndlessConsumer implements Runnable
   private KafkaConsumer<String, String> consumer = null;
 
 
+  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
+
+
   public EndlessConsumer(
       ExecutorService executor,
       String bootstrapServer,
@@ -63,13 +66,44 @@ public class EndlessConsumer implements Runnable
       props.put("group.id", groupId);
       props.put("client.id", id);
       props.put("auto.offset.reset", autoOffsetReset);
+      props.put("metadata.max.age.ms", "1000");
       props.put("key.deserializer", StringDeserializer.class.getName());
       props.put("value.deserializer", StringDeserializer.class.getName());
 
       this.consumer = new KafkaConsumer<>(props);
 
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic));
+      consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
+      {
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+        {
+          partitions.forEach(tp ->
+          {
+            log.info("{} - removing partition: {}", id, tp);
+            Map<String, Integer> removed = seen.remove(tp.partition());
+            for (String key : removed.keySet())
+            {
+              log.info(
+                  "{} - Seen {} messages for partition={}|key={}",
+                  id,
+                  removed.get(key),
+                  tp.partition(),
+                  key);
+            }
+          });
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+        {
+          partitions.forEach(tp ->
+          {
+            log.info("{} - adding partition: {}", id, tp);
+            seen.put(tp.partition(), new HashMap<>());
+          });
+        }
+      });
 
       while (true)
       {
@@ -90,6 +124,17 @@ public class EndlessConsumer implements Runnable
               record.key(),
               record.value()
           );
+
+          Integer partition = record.partition();
+          String key = record.key() == null ? "NULL" : record.key();
+          Map<String, Integer> byKey = seen.get(partition);
+
+          if (!byKey.containsKey(key))
+            byKey.put(key, 0);
+
+          int seenByKey = byKey.get(key);
+          seenByKey++;
+          byKey.put(key, seenByKey);
         }
       }
     }
@@ -131,6 +176,11 @@ public class EndlessConsumer implements Runnable
     }
   }
 
+  public Map<Integer, Map<String, Integer>> getSeen()
+  {
+    return seen;
+  }
+
   public void start()
   {
     lock.lock();