Benennung vereinheitlicht und projektunabhängig gemacht counting-consumer
authorKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 08:01:42 +0000 (10:01 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 08:32:26 +0000 (10:32 +0200)
* Angelehnt an die Code-Vereinheitlickung in `sumup-adder`.
* Dabei: Setup und Skript angepasst und repariert.

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationTests.java

index 13176d2..6dbc9d9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,6 +9,7 @@ then
   exit
 fi
 
+docker-compose stop producer consumer
 docker-compose up -d zookeeper kafka cli
 
 if [[
@@ -16,7 +17,8 @@ if [[
   "$1" = "build"
 ]]
 then
-  mvn install || exit
+  docker-compose rm -svf consumer
+  mvn clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
@@ -24,7 +26,6 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
 
 docker-compose exec -T cli bash << 'EOF'
 echo "Creating topic with 3 partitions..."
@@ -35,17 +36,12 @@ kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
 kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 EOF
 
-docker-compose up -d consumer
+docker-compose up -d producer consumer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done
+while [[ "$(http :8081/state | jq -r .)" == "{}" ]]; do echo "Waiting for some state to show up..."; done
 
-docker-compose up -d producer
-sleep 10
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+http -v :8081/state
 
 docker-compose stop producer
 docker-compose exec -T cli bash << 'EOF'
@@ -57,32 +53,33 @@ kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 EOF
 
 docker-compose start producer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+
 sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
+http -v :8081/state
 sleep 1
-http -v :8081/seen
-docker-compose stop producer consumer
+http -v :8081/state
+
+docker-compose stop producer
index df6b321..f13e04d 100644 (file)
@@ -24,14 +24,6 @@ services:
     depends_on:
       - zookeeper
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
-    environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
-
   cli:
     image: juplo/toolbox
     command: sleep infinity
index bb219d0..bf00b6d 100644 (file)
@@ -17,16 +17,16 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public KeyCountingRecordHandler keyCountingRecordHandler()
+  public ApplicationRecordHandler recordHandler()
   {
-    return new KeyCountingRecordHandler();
+    return new ApplicationRecordHandler();
   }
 
   @Bean
   public EndlessConsumer<String, Long> endlessConsumer(
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      KeyCountingRecordHandler keyCountingRecordHandler,
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -35,7 +35,7 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            keyCountingRecordHandler);
+            recordHandler);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
new file mode 100644 (file)
index 0000000..3492c0d
--- /dev/null
@@ -0,0 +1,40 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
+{
+  private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+
+
+  @Override
+  public void accept(ConsumerRecord<String, Long> record)
+  {
+    Integer partition = record.partition();
+    String key = record.key() == null ? "NULL" : record.key().toString();
+
+    if (!state.containsKey(partition))
+      state.put(partition, new HashMap<>());
+
+    Map<String, Long> byKey = state.get(partition);
+
+    if (!byKey.containsKey(key))
+      byKey.put(key, 0l);
+
+    long seenByKey = byKey.get(key);
+    seenByKey++;
+    byKey.put(key, seenByKey);
+  }
+
+
+  public Map<Integer, Map<String, Long>> getState()
+  {
+    return state;
+  }
+}
index f6ff47f..09fb762 100644 (file)
@@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException;
 public class DriverController
 {
   private final EndlessConsumer consumer;
-  private final KeyCountingRecordHandler keyCountingRecordHandler;
+  private final ApplicationRecordHandler recordHandler;
 
 
   @PostMapping("start")
@@ -29,10 +29,10 @@ public class DriverController
   }
 
 
-  @GetMapping("seen")
-  public Map<Integer, Map<String, Long>> seen()
+  @GetMapping("state")
+  public Map<Integer, Map<String, Long>> state()
   {
-    return keyCountingRecordHandler.getSeen();
+    return recordHandler.getState();
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java
deleted file mode 100644 (file)
index 83b3ff2..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
-  @Override
-  public void accept(ConsumerRecord<String, Long> record)
-  {
-    Integer partition = record.partition();
-    String key = record.key() == null ? "NULL" : record.key().toString();
-
-    if (!seen.containsKey(partition))
-      seen.put(partition, new HashMap<>());
-
-    Map<String, Long> byKey = seen.get(partition);
-
-    if (!byKey.containsKey(key))
-      byKey.put(key, 0l);
-
-    long seenByKey = byKey.get(key);
-    seenByKey++;
-    byKey.put(key, seenByKey);
-  }
-
-
-  public Map<Integer, Map<String, Long>> getSeen()
-  {
-    return seen;
-  }
-}
index 0909f2c..ffc0a0b 100644 (file)
@@ -65,7 +65,7 @@ class ApplicationTests
        @Autowired
        ExecutorService executor;
        @Autowired
-       KeyCountingRecordHandler keyCountingRecordHandler;
+       ApplicationRecordHandler recordHandler;
 
        EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
@@ -268,7 +268,7 @@ class ApplicationTests
                });
 
                TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
-                               new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+                               new TestRecordHandler<String, Long>(recordHandler) {
                                        @Override
                                        public void onNewRecord(ConsumerRecord<String, Long> record)
                                        {