Merge der Refaktorisierung des EndlessConsumer (Branch 'stored-state')
authorKai Moritz <kai@juplo.de>
Sun, 24 Jul 2022 13:35:14 +0000 (15:35 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 12 Aug 2022 14:44:01 +0000 (16:44 +0200)
* Die `commtSync()`-Aufrufe machen beim Speichern der Offsets außerhalb
  von Kafka keinen Sinn mehr.
* Der Testfall musste an die extern gespeicherten Offsets angepasst
  werden: Die gesehenen Offsets müssen aus der MongoDB gelesen werden,
  anstatt über einen separaten Consumer aus Kafka.
* Der mit dem Merge hinzugefügte Test schlägt fehl, da er einen Fehler
  aufdeckt (NPE bei einer Log-Ausgabe zur Offset-Verarbeitung).

README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/StatisticsDocument.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index 39e9300..b7bce99 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -24,65 +24,17 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
+docker-compose up setup
+docker-compose up -d producer peter beate
 
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
+sleep 15
 
-docker-compose up -d consumer
-
-docker-compose up -d producer
+http -v post :8082/stop
 sleep 10
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop producer
-docker-compose exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 3 to 7..."
-# tag::altertopic[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# end::altertopic[]
-EOF
+docker-compose kill -s 9 peter
+http -v post :8082/start
+sleep 60
 
-docker-compose start producer
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-docker-compose stop producer consumer
+docker-compose stop producer peter beate
+docker-compose logs beate
+docker-compose logs --tail=10 peter
index 30ae3b4..51f7293 100644 (file)
@@ -41,13 +41,13 @@ services:
       ME_CONFIG_MONGODB_ADMINPASSWORD: training
       ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
-    environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+  setup:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+      "
 
   cli:
     image: juplo/toolbox
@@ -62,17 +62,29 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
-      producer.throttle-ms: 10
+      producer.throttle-ms: 500
 
 
-  consumer:
+  peter:
     image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
       - 8081:8080
     environment:
       server.port: 8080
       consumer.bootstrap-server: kafka:9092
-      consumer.client-id: consumer
+      consumer.client-id: peter
+      consumer.topic: test
+      spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
+      spring.data.mongodb.database: juplo
+
+  beate:
+    image: juplo/endless-consumer:1.0-SNAPSHOT
+    ports:
+      - 8082:8080
+    environment:
+      server.port: 8080
+      consumer.bootstrap-server: kafka:9092
+      consumer.client-id: beate
       consumer.topic: test
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
index 1ba9d5b..54e9b89 100644 (file)
@@ -59,6 +59,7 @@ public class ApplicationConfiguration
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("group.id", properties.getGroupId());
     props.put("client.id", properties.getClientId());
+    props.put("enable.auto.commit", false);
     props.put("auto.offset.reset", properties.getAutoOffsetReset());
     props.put("metadata.max.age.ms", "1000");
     props.put("key.deserializer", StringDeserializer.class.getName());
index a21dd86..2a3445c 100644 (file)
@@ -63,7 +63,7 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
             partition,
             key);
       }
-      repository.save(new StatisticsDocument(partition, removed));
+      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
     });
   }
 
@@ -75,13 +75,12 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
       Integer partition = tp.partition();
       Long offset = consumer.position(tp);
       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-      offsets.put(partition, offset);
-      seen.put(
-          partition,
+      StatisticsDocument document =
           repository
-              .findById(Integer.toString(tp.partition()))
-              .map(document -> document.statistics)
-              .orElse(new HashMap<>()));
+              .findById(Integer.toString(partition))
+              .orElse(new StatisticsDocument(partition));
+      consumer.seek(tp, document.offset);
+      seen.put(partition, document.statistics);
     });
   }
 
@@ -128,12 +127,17 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           seenByKey++;
           byKey.put(key, seenByKey);
         }
+
+        seen.forEach((partiton, statistics) -> repository.save(
+            new StatisticsDocument(
+                partiton,
+                statistics,
+                consumer.position(new TopicPartition(topic, partiton)))));
       }
     }
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
       shutdown();
     }
     catch(RecordDeserializationException e)
@@ -147,7 +151,6 @@ public class EndlessConsumer<K, V> implements ConsumerRebalanceListener, Runnabl
           offset,
           e.getCause().toString());
 
-      consumer.commitSync();
       shutdown(e);
     }
     catch(Exception e)
index 2416253..28264ec 100644 (file)
@@ -14,15 +14,23 @@ public class StatisticsDocument
 {
   @Id
   public String id;
+  public long offset;
   public Map<String, Long> statistics;
 
   public StatisticsDocument()
   {
   }
 
-  public StatisticsDocument(Integer partition, Map<String, Long> statistics)
+  public StatisticsDocument(Integer partition)
+  {
+    this.id = Integer.toString(partition);
+    this.statistics = new HashMap<>();
+  }
+
+  public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
   {
     this.id = Integer.toString(partition);
     this.statistics = statistics;
+    this.offset = offset;
   }
 }
index caa25c5..4b7ef36 100644 (file)
@@ -63,7 +63,7 @@ class ApplicationTests
        @Autowired
        KafkaConsumer<String, Long> kafkaConsumer;
        @Autowired
-       KafkaConsumer<Bytes, Bytes> offsetConsumer;
+       PartitionStatisticsRepository partitionStatisticsRepository;
        @Autowired
        ApplicationProperties properties;
        @Autowired
@@ -177,9 +177,12 @@ class ApplicationTests
 
        void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
        {
-               offsetConsumer.assign(partitions());
-               partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-               offsetConsumer.unsubscribe();
+               partitions().forEach(tp ->
+               {
+                       String partition = Integer.toString(tp.partition());
+                       Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+                       consumer.accept(tp, offset.orElse(0l));
+               });
        }
 
        List<TopicPartition> partitions()
@@ -306,18 +309,5 @@ class ApplicationTests
 
                        return new KafkaProducer<>(props);
                }
-
-               @Bean
-               KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
-               {
-                       Properties props = new Properties();
-                       props.put("bootstrap.servers", properties.getBootstrapServer());
-                       props.put("client.id", "OFFSET-CONSUMER");
-                       props.put("group.id", properties.getGroupId());
-                       props.put("key.deserializer", BytesDeserializer.class.getName());
-                       props.put("value.deserializer", BytesDeserializer.class.getName());
-
-                       return new KafkaConsumer<>(props);
-               }
        }
 }