From: Kai Moritz Date: Sun, 28 Aug 2022 15:57:45 +0000 (+0200) Subject: Verbesserungen / Korrekturen aus `sumup-adder--ohne-stored-offsets' gemerged X-Git-Tag: sumup-adder---lvm-2-tage~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a4bbf334910cb37fabadb720cd0ee7f2a21ecaa7;p=demos%2Fkafka%2Ftraining Verbesserungen / Korrekturen aus `sumup-adder--ohne-stored-offsets' gemerged * Merge branch 'sumup-adder--ohne-stored-offsets' into sumup-adder * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java ** src/main/java/de/juplo/kafka/EndlessConsumer.java ** src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java * Logik-Änderungen gegenüber der Implementierung mit Speicherung der Offsets in der Mongo-DB anstatt in Kafka in dem Branch 'sumup-adder' aus dem Merge entfernt. Merge remote-tracking branch 'origin/sumup-adder--ohne--stored-offsets' into sumup-adder "Committed offset must be at most equal to the offset of the consumer" --- a4bbf334910cb37fabadb720cd0ee7f2a21ecaa7 diff --cc docker-compose.yml index 4a4d560,c46b00d..fb6ce7a --- a/docker-compose.yml +++ b/docker-compose.yml @@@ -132,8 -132,10 +132,9 @@@ services sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-1 sumup.adder.commit-interval: 3s + sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG adder-2: image: juplo/sumup-adder:1.0-SNAPSHOT @@@ -144,8 -146,10 +145,9 @@@ sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-2 sumup.adder.commit-interval: 3s + sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG peter: image: juplo/toolbox diff --cc src/main/java/de/juplo/kafka/ApplicationConfiguration.java index f83661e,e08cff4..e4ac1ab --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@@ -7,7 -7,7 +7,8 @@@ import org.springframework.boot.context import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Clock; + import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; diff --cc src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 109b205,d319295..b517d35 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@@ -19,10 -16,7 +19,10 @@@ public class ApplicationRebalanceListen private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; - private final Consumer consumer; + private final Consumer consumer; private final Set partitions = new HashSet<>();