Merge branch 'stored-state' into stored-offsets
authorKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:19:01 +0000 (01:19 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 7 Apr 2022 23:19:01 +0000 (01:19 +0200)
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java

index 13176d2..53b42e9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -24,65 +24,17 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
+docker-compose up setup
+docker-compose up -d producer peter beate
 
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
+sleep 15
 
-docker-compose up -d consumer
-
-docker-compose up -d producer
+http -v post :8082/stop
 sleep 10
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop producer
-docker-compose exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 3 to 7..."
-# tag::altertopic[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# end::altertopic[]
-EOF
+docker-compose kill -s 9 peter
+http -v post :8082/start
+sleep 60
 
-docker-compose start producer
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-docker-compose stop producer consumer
+docker-compose stop producer peter beate
+docker-compose logs beate
+docker-compose logs --tail=10 peter
index 5723fc7..b84ed52 100644 (file)
@@ -41,13 +41,13 @@ services:
       ME_CONFIG_MONGODB_ADMINPASSWORD: training
       ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
-    environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+  setup:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+      "
 
   cli:
     image: juplo/toolbox
@@ -61,16 +61,27 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 10
+      producer.throttle-ms: 500
 
 
-  consumer:
+  peter:
     image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8081
     environment:
       consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer
+      consumer.client-id: peter
+      consumer.topic: test
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
+
+  beate:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
+    ports:
+      - 8082:8081
+    environment:
+      consumer.bootstrap-server: kafka:9092
+      consumer.client-id: beate
       consumer.topic: test
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
index 7cb77aa..2563204 100644 (file)
@@ -64,6 +64,7 @@ public class EndlessConsumer implements Runnable
       props.put("bootstrap.servers", bootstrapServer);
       props.put("group.id", groupId);
       props.put("client.id", id);
+      props.put("enable.auto.commit", false);
       props.put("auto.offset.reset", autoOffsetReset);
       props.put("metadata.max.age.ms", "1000");
       props.put("key.deserializer", StringDeserializer.class.getName());
@@ -90,7 +91,7 @@ public class EndlessConsumer implements Runnable
                   tp.partition(),
                   key);
             }
-            repository.save(new StatisticsDocument(tp.partition(), removed));
+            repository.save(new StatisticsDocument(tp.partition(), removed, consumer.position(tp)));
           });
         }
 
@@ -100,12 +101,12 @@ public class EndlessConsumer implements Runnable
           partitions.forEach(tp ->
           {
             log.info("{} - adding partition: {}", id, tp);
-            seen.put(
-                tp.partition(),
+            StatisticsDocument document =
                 repository
                     .findById(Integer.toString(tp.partition()))
-                    .map(document -> document.statistics)
-                    .orElse(new HashMap<>()));
+                    .orElse(new StatisticsDocument(tp.partition()));
+            consumer.seek(tp, document.offset);
+            seen.put(tp.partition(), document.statistics);
           });
         }
       });
@@ -141,6 +142,12 @@ public class EndlessConsumer implements Runnable
           seenByKey++;
           byKey.put(key, seenByKey);
         }
+
+        seen.forEach((partiton, statistics) -> repository.save(
+            new StatisticsDocument(
+                partiton,
+                statistics,
+                consumer.position(new TopicPartition(topic, partiton)))));
       }
     }
     catch(WakeupException e)
index be998ca..96ebfb1 100644 (file)
@@ -14,15 +14,23 @@ public class StatisticsDocument
 {
   @Id
   public String id;
+  public long offset;
   public Map<String, Integer> statistics;
 
   public StatisticsDocument()
   {
   }
 
-  public StatisticsDocument(Integer partition, Map<String, Integer> statistics)
+  public StatisticsDocument(Integer partition)
+  {
+    this.id = Integer.toString(partition);
+    this.statistics = new HashMap<>();
+  }
+
+  public StatisticsDocument(Integer partition, Map<String, Integer> statistics, long offset)
   {
     this.id = Integer.toString(partition);
     this.statistics = statistics;
+    this.offset = offset;
   }
 }