From 35c806a3746c673a2479c702de7a9903783c34a9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Jul 2022 20:04:07 +0200 Subject: [PATCH 01/16] Upgrade von Spring Boot und den Confluent-Kafka-Images MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Upgrade der Kafk-Images von Confluent 7.0.2 auf 7.1.3 ** Unterstützt Kafka 3.1.x (siehe https://docs.confluent.io/platform/current/installation/versions-interoperability.html[Versions-Matrix]) * Upgrade für Spring Boot von 2.6.5 auf 2.7.2 ** Enthält Kafka: 3.1.1 ** Enthält Spring Kafka: 2.8.8 --- docker-compose.yml | 4 ++-- pom.xml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index d38ebec..ec307f5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,14 @@ version: '3.2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.0.2 + image: confluentinc/cp-zookeeper:7.1.3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 kafka: - image: confluentinc/cp-kafka:7.0.2 + image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 diff --git a/pom.xml b/pom.xml index c104ca1..70f37e8 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.6.5 + 2.7.2 -- 2.20.1 From d115070b9cbc56f4ec9d47ec658f49527fbeb35e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Jul 2022 13:26:29 +0200 Subject: [PATCH 02/16] =?utf8?q?Compose-Konfiguration=20unabh=C3=A4ngig=20?= =?utf8?q?von=20Default-Konfiguration=20gemacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Damit Instanzen parallel über die IDE (mit voreingestelltem Default-Port) und Compose gestartet werden können, wurden den einzelnen Komponenten (Producer, Consumer etc.) jeweils unterschiedliche explizite Default-Ports zugewiesen. * Dies führt leicht zu fehlern, in den Compose-Setups, da dort i.d.R. Port-Mappings für die gestarteten Instanzen definiert werden. * Daher werden die Compose-Setups jetzt so umgestellt, dass sie den einkompilierten Default-Port der Komponenten explizit mit dem Port `8080` überschreiben, so dass alle Komponenten _innerhalb_ von Compose einheitlich (und so wie bei Spring-Boot standard) über `8080` ansprechbar sind. --- docker-compose.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index a46c516..159f9cb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,8 +39,9 @@ services: producer: image: juplo/endless-producer:1.0-SNAPSHOT ports: - - 8080:8880 + - 8080:8080 environment: + server.port: 8080 producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test @@ -50,8 +51,9 @@ services: consumer: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - - 8081:8881 + - 8081:8080 environment: + server.port: 8080 consumer.bootstrap-server: kafka:9092 consumer.client-id: my-group consumer.client-id: consumer -- 2.20.1 From 581d0b3851f2db9b52fd049b64ca237ef0ba3515 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Jul 2022 15:58:16 +0200 Subject: [PATCH 03/16] README.sh startet MongoDB und Mongo-Express --- README.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.sh b/README.sh index 13176d2..39e9300 100755 --- a/README.sh +++ b/README.sh @@ -9,7 +9,7 @@ then exit fi -docker-compose up -d zookeeper kafka cli +docker-compose up -d zookeeper kafka cli mongo express if [[ $(docker image ls -q $IMAGE) == "" || -- 2.20.1 From 5b4d66eaf6cac9261ab5c36174a16b04c62adf30 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 16:12:04 +0200 Subject: [PATCH 04/16] Fehler in Logging-Ausgabe korrigiert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der über den Merge hinzugefügt Test hat einen Fehler aufgedeckt. * In onPartitionsRevoked() wurde bei der Berechnung der verarbeiteten Nachrichten für die Log-Ausgabe ein Nullzeiger dereferenziert. * Ursache dafür war, dass die Map `offsets` in der Version, die die Offsets speichert gar nicht mehr gepflegt wurde. --- src/main/java/de/juplo/kafka/EndlessConsumer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 2a3445c..6e460b4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -35,7 +35,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private long consumed = 0; private final Map> seen = new HashMap<>(); - private final Map offsets = new HashMap<>(); + private final Map lastOffsets = new HashMap<>(); @Override @@ -45,7 +45,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl { Integer partition = tp.partition(); Long newOffset = consumer.position(tp); - Long oldOffset = offsets.remove(partition); + Long oldOffset = lastOffsets.remove(partition); log.info( "{} - removing partition: {}, consumed {} records (offset {} -> {})", id, @@ -80,6 +80,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); consumer.seek(tp, document.offset); + lastOffsets.put(partition, document.offset); seen.put(partition, document.statistics); }); } -- 2.20.1 From 34d37c55d7cf830c6d2bdaf747f0938eb557bef3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 16:15:23 +0200 Subject: [PATCH 05/16] Ausgabe der verarbeiteten Nachrichten im Revoke-Callback entfernt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Es musste allein für diese Ausgabe eine Map mit den zuletzt eingelesenen Offset-Positionen gepflegt werden. * Das ist zu viel Overhead, für die Randmeldung im Log. --- src/main/java/de/juplo/kafka/EndlessConsumer.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 6e460b4..7e243a9 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -35,7 +35,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private long consumed = 0; private final Map> seen = new HashMap<>(); - private final Map lastOffsets = new HashMap<>(); @Override @@ -45,13 +44,10 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl { Integer partition = tp.partition(); Long newOffset = consumer.position(tp); - Long oldOffset = lastOffsets.remove(partition); log.info( - "{} - removing partition: {}, consumed {} records (offset {} -> {})", + "{} - removing partition: {}, offset of next message {})", id, partition, - newOffset - oldOffset, - oldOffset, newOffset); Map removed = seen.remove(partition); for (String key : removed.keySet()) @@ -80,7 +76,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); consumer.seek(tp, document.offset); - lastOffsets.put(partition, document.offset); seen.put(partition, document.statistics); }); } -- 2.20.1 From 4a6608a5a0f58805631f0e451f39fd0c7dc21a2e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 17:18:02 +0200 Subject: [PATCH 06/16] mongo-express sollte erst nach der MongoDB gestartet werden --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 51f7293..0e420b6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -40,6 +40,8 @@ services: ME_CONFIG_MONGODB_ADMINUSERNAME: juplo ME_CONFIG_MONGODB_ADMINPASSWORD: training ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + depends_on: + - mongo setup: image: juplo/toolbox -- 2.20.1 From ce840f48340d55613291fca468bf10b834c473db Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 17:40:36 +0200 Subject: [PATCH 07/16] =?utf8?q?Fehler=20im=20Shutdown-Code=20korrigiert:?= =?utf8?q?=20Shutdown=20von=20`EndlessConsumer`=20zu=20sp=C3=A4t?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/Application.java | 18 ++++++++++++++++-- .../java/de/juplo/kafka/EndlessConsumer.java | 17 +---------------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index d280aa6..76c2520 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -30,8 +30,22 @@ public class Application implements ApplicationRunner } @PreDestroy - public void stopExecutor() + public void shutdown() { + try + { + log.info("Stopping EndlessConsumer"); + endlessConsumer.stop(); + } + catch (IllegalStateException e) + { + log.info("Was already stopped: {}", e.toString()); + } + catch (Exception e) + { + log.error("Unexpected exception while stopping EndlessConsumer: {}", e); + } + try { log.info("Shutting down the ExecutorService."); @@ -41,7 +55,7 @@ public class Application implements ApplicationRunner } catch (InterruptedException e) { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e.toString()); + log.error("Exception while waiting for the termination of the ExecutorService: {}", e); } finally { diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 7e243a9..3d154c2 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -243,22 +243,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl public void destroy() throws ExecutionException, InterruptedException { log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - catch (Exception e) - { - log.error("{} - Unexpected exception while trying to stop the consumer", id, e); - } - finally - { - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } public boolean running() -- 2.20.1 From 6ded138c6b8139da2cdc13f2380b5f5a4e51cc4e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 18:22:00 +0200 Subject: [PATCH 08/16] Wenn kein gespeicherter Offset vorliegt, auto.offset.reset von Kafka nutzen MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Es wird jetzt nur noch dann ein expliziter Seek durchgeführt, wenn eine gespeicherte Offset-Position gefunden wurde. * Andernfalls wird der von Kafka initialisierte Ausgansgs-Offset verwendet. * Welchen Offset Kafka vorgibt, hängt von `auto.offset.rest` ab! --- src/main/java/de/juplo/kafka/EndlessConsumer.java | 7 ++++++- src/main/java/de/juplo/kafka/StatisticsDocument.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 3d154c2..a93ae2c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -75,7 +75,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl repository .findById(Integer.toString(partition)) .orElse(new StatisticsDocument(partition)); - consumer.seek(tp, document.offset); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } seen.put(partition, document.statistics); }); } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 28264ec..1244f45 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -14,7 +14,7 @@ public class StatisticsDocument { @Id public String id; - public long offset; + public long offset = -1l; public Map statistics; public StatisticsDocument() -- 2.20.1 From d675a67e01107b52240abbe62820aa1c8519f88d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 18:39:05 +0200 Subject: [PATCH 09/16] Das Speichern der Daten und Offsets erfolgt nicht mehr nach jedem `poll()` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Statdessen kann eine `Duration` konfiguriert werden. * Ähnlich wie in der Client-Library von Kafka, wird ein Zeitstempel für den letzten Commit gespeichert und die Daten werden immer dann gespeichert, wenn dieser weiter als das eingestellte `consumer.commit-interval` in der Vergangenheit liegt. --- .../juplo/kafka/ApplicationConfiguration.java | 3 +++ .../de/juplo/kafka/ApplicationProperties.java | 3 +++ .../java/de/juplo/kafka/EndlessConsumer.java | 21 ++++++++++++++----- src/main/resources/application.yml | 1 + .../java/de/juplo/kafka/ApplicationTests.java | 4 ++++ 5 files changed, 27 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 54e9b89..7a24c97 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -8,6 +8,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,6 +42,8 @@ public class ApplicationConfiguration repository, properties.getClientId(), properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), kafkaConsumer, handler); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fa731c5..14e928f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import java.time.Duration; @ConfigurationProperties(prefix = "consumer") @@ -30,4 +31,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String autoOffsetReset; + @NotNull + private Duration commitInterval; } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index a93ae2c..ce5dd72 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -8,7 +8,9 @@ import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; +import java.time.Clock; import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -25,6 +27,8 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private final PartitionStatisticsRepository repository; private final String id; private final String topic; + private final Clock clock; + private final Duration commitInterval; private final Consumer consumer; private final java.util.function.Consumer> handler; @@ -94,6 +98,8 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic), this); + Instant lastCommit = clock.instant(); + while (true) { ConsumerRecords records = @@ -129,11 +135,16 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl byKey.put(key, seenByKey); } - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + seen.forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } } } catch(WakeupException e) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 93b27c2..24f434f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ consumer: client-id: DEV topic: test auto-offset-reset: earliest + commit-interval: 30s management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 4b7ef36..431431b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -21,6 +21,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; @@ -43,6 +44,7 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, + "consumer.commit-interval=1s", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -268,6 +270,8 @@ class ApplicationTests repository, properties.getClientId(), properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), kafkaConsumer, captureOffsetAndExecuteTestHandler); -- 2.20.1 From ecd63311e0b6af698185bcf1e085f2b3237bd264 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 19:32:51 +0200 Subject: [PATCH 10/16] Auf den `CooperativeStickyAssignor` umgestellt --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 7a24c97..9b06b09 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -60,6 +60,7 @@ public class ApplicationConfiguration Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("enable.auto.commit", false); -- 2.20.1 From 60bc4a251dc9bab71d5ab5f12870147fec253ac9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 17:18:33 +0200 Subject: [PATCH 11/16] =?utf8?q?Umstellung=20des=20Nachrichten-Datentyps?= =?utf8?q?=20auf=20Long=20zur=C3=BCckgenommen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Im Branch 'deserialization' wurde der Datentyp der Nachricht von `String` auf `Long` umgestellt, um eine `DeserializationException` vorzuführen, die innerhalb des Kafka-Codes geworfen wird. * Diese Änderung wurde schon dort nicht in dem `README.sh`-Skript reflektiert. * Hier stört sie jetzt die Experimente mit dem `EndlessProducer`, der Nachrichten vom Typ `String` erzeugt, so dass der Consumer kein einzige Nachricht annehmen kann. * Daher wird der Nachrichten-Datentyp hier wieder auf `String` zurück umgestellt. * Dafür musste auch der Testfall angepasst und der Test entfernt werden, der die Exception kontrolliert. --- .../juplo/kafka/ApplicationConfiguration.java | 13 +++-- .../kafka/ApplicationHealthIndicator.java | 2 +- .../java/de/juplo/kafka/EndlessConsumer.java | 2 +- .../java/de/juplo/kafka/ApplicationTests.java | 47 ++----------------- 4 files changed, 13 insertions(+), 51 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 9b06b09..08c3955 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -20,7 +19,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public Consumer> consumer() { return (record) -> { @@ -29,10 +28,10 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + Consumer> handler, PartitionStatisticsRepository repository, ApplicationProperties properties) { @@ -55,7 +54,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -67,7 +66,7 @@ public class ApplicationConfiguration props.put("auto.offset.reset", properties.getAutoOffsetReset()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", LongDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index dc3a26e..df4e653 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index ce5dd72..f9a9629 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -236,7 +236,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl } } - public synchronized void stop() throws ExecutionException, InterruptedException + public synchronized void stop() throws InterruptedException { lock.lock(); try diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 431431b..ca72e3c 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -63,7 +62,7 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; @Autowired PartitionStatisticsRepository partitionStatisticsRepository; @Autowired @@ -73,17 +72,16 @@ class ApplicationTests @Autowired PartitionStatisticsRepository repository; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); @@ -105,41 +103,6 @@ class ApplicationTests .describedAs("Consumer should still be running"); } - @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() - { - send100Messages(counter -> - counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter))); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(100); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - } - /** Helper methods for the verification of expectations */ @@ -254,7 +217,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( -- 2.20.1 From a6a0a22a5fa34a01b0e8b2bc1e0e2b82d7b60f33 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 21:34:43 +0200 Subject: [PATCH 12/16] Wordcount-Implementierung mit Kafka-Boardmitteln und MongoDB als Storage MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Zählt die Wörter pro Benutzer. * Simple Implementierung mit Maps. * Verwendet die bereits für das Speichern der Nachrichten-Zählung und der Offsets verwendete MonogoDB-Anbindung zum speichern. * Typisierung zurückgenommn: Immer String für Key/Value * Verwendet aus Bequemlichkeit den Seen-Endpoint von der Zählung. --- docker-compose.yml | 8 +-- pom.xml | 5 +- .../juplo/kafka/ApplicationConfiguration.java | 17 +---- .../kafka/ApplicationHealthIndicator.java | 2 +- .../java/de/juplo/kafka/DriverController.java | 22 +++++-- .../java/de/juplo/kafka/EndlessConsumer.java | 62 ++++++++++--------- .../de/juplo/kafka/StatisticsDocument.java | 4 +- .../java/de/juplo/kafka/ApplicationTests.java | 7 +-- 8 files changed, 65 insertions(+), 62 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0e420b6..df41cb5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -56,7 +56,7 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/rest-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: @@ -64,11 +64,9 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 500 - peter: - image: juplo/endless-consumer:1.0-SNAPSHOT + image: juplo/wordcount:1.0-SNAPSHOT ports: - 8081:8080 environment: @@ -80,7 +78,7 @@ services: spring.data.mongodb.database: juplo beate: - image: juplo/endless-consumer:1.0-SNAPSHOT + image: juplo/wordcount:1.0-SNAPSHOT ports: - 8082:8080 environment: diff --git a/pom.xml b/pom.xml index 701704d..fe06959 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,10 @@ de.juplo.kafka - endless-consumer + wordcount 1.0-SNAPSHOT - Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic + Wordcount + Splits the incomming sentences into words and counts the words per user. diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c3955..2cf263e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,32 +19,21 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() - { - return (record) -> - { - // Handle record - }; - } - - @Bean - public EndlessConsumer endlessConsumer( + public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, PartitionStatisticsRepository repository, ApplicationProperties properties) { return - new EndlessConsumer<>( + new EndlessConsumer( executor, repository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), properties.getCommitInterval(), - kafkaConsumer, - handler); + kafkaConsumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..ab9782c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index ed38080..e64d6b8 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -2,11 +2,8 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -33,11 +30,24 @@ public class DriverController @GetMapping("seen") - public Map> seen() + public Map>> seen() { return consumer.getSeen(); } + @GetMapping("seen/{user}") + public ResponseEntity> seen(@PathVariable String user) + { + for (Map> users : consumer.getSeen().values()) + { + Map words = users.get(user); + if (words != null) + return ResponseEntity.ok(words); + } + + return ResponseEntity.notFound().build(); + } + @ExceptionHandler @ResponseStatus(HttpStatus.BAD_REQUEST) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index f9a9629..01f9057 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -17,20 +17,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class EndlessConsumer implements ConsumerRebalanceListener, Runnable { + final static Pattern PATTERN = Pattern.compile("\\W+"); + + private final ExecutorService executor; private final PartitionStatisticsRepository repository; private final String id; private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; - private final java.util.function.Consumer> handler; + private final Consumer consumer; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -38,7 +41,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private Exception exception; private long consumed = 0; - private final Map> seen = new HashMap<>(); + private final Map>> seen = new HashMap<>(); @Override @@ -53,16 +56,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl id, partition, newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } + Map> removed = seen.remove(partition); repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); }); } @@ -102,12 +96,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); // Do something with the data... log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { log.info( "{} - {}: {}/{} - {}={}", @@ -119,20 +113,32 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl record.value() ); - handler.accept(record); - consumed++; Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); + String user = record.key(); + Map> users = seen.get(partition); + + Map words = users.get(user); + if (words == null) + { + words = new HashMap<>(); + users.put(user, words); + } + + for (String word : PATTERN.split(record.value())) + { + Long num = words.get(word); + if (num == null) + { + num = 1l; + } + else + { + num++; + } + words.put(word, num); + } } if (lastCommit.plus(commitInterval).isBefore(clock.instant())) @@ -212,7 +218,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl } } - public Map> getSeen() + public Map>> getSeen() { return seen; } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 1244f45..137c9bb 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -15,7 +15,7 @@ public class StatisticsDocument @Id public String id; public long offset = -1l; - public Map statistics; + public Map> statistics; public StatisticsDocument() { @@ -27,7 +27,7 @@ public class StatisticsDocument this.statistics = new HashMap<>(); } - public StatisticsDocument(Integer partition, Map statistics, long offset) + public StatisticsDocument(Integer partition, Map> statistics, long offset) { this.id = Integer.toString(partition); this.statistics = statistics; diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ca72e3c..aa6dd4d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -73,7 +73,7 @@ class ApplicationTests PartitionStatisticsRepository repository; Consumer> testHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; Set> receivedRecords; @@ -228,15 +228,14 @@ class ApplicationTests }; endlessConsumer = - new EndlessConsumer<>( + new EndlessConsumer( executor, repository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), properties.getCommitInterval(), - kafkaConsumer, - captureOffsetAndExecuteTestHandler); + kafkaConsumer); endlessConsumer.start(); } -- 2.20.1 From 2d84eda74475aaffff11ddfebe56d309b9aff2e9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 11 Aug 2022 20:52:35 +0200 Subject: [PATCH 13/16] refactor: Implementierung an Branch `stored-offsets` angepasst --- .../juplo/kafka/ApplicationConfiguration.java | 30 +++-- .../kafka/ApplicationHealthIndicator.java | 2 +- .../java/de/juplo/kafka/DriverController.java | 5 +- .../java/de/juplo/kafka/EndlessConsumer.java | 96 ++------------ .../java/de/juplo/kafka/RecordHandler.java | 16 +++ .../juplo/kafka/WordcountRecordHandler.java | 119 ++++++++++++++++++ .../java/de/juplo/kafka/ApplicationTests.java | 34 +++-- .../de/juplo/kafka/TestRecordHandler.java | 41 ++++++ 8 files changed, 227 insertions(+), 116 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/RecordHandler.java create mode 100644 src/main/java/de/juplo/kafka/WordcountRecordHandler.java create mode 100644 src/test/java/de/juplo/kafka/TestRecordHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 2cf263e..b077a90 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,6 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -11,7 +11,6 @@ import java.time.Clock; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; @Configuration @@ -19,21 +18,34 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public EndlessConsumer endlessConsumer( + public WordcountRecordHandler wordcountRecordHandler( + PartitionStatisticsRepository repository, + Consumer consumer, + ApplicationProperties properties) + { + return new WordcountRecordHandler( + repository, + properties.getClientId(), + properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), + consumer); + } + + @Bean + public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - PartitionStatisticsRepository repository, + WordcountRecordHandler wordcountRecordHandler, ApplicationProperties properties) { return - new EndlessConsumer( + new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), - kafkaConsumer); + kafkaConsumer, + wordcountRecordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index ab9782c..df4e653 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index e64d6b8..5d6c1a8 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -14,6 +14,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; + private final WordcountRecordHandler wordcount; @PostMapping("start") @@ -32,13 +33,13 @@ public class DriverController @GetMapping("seen") public Map>> seen() { - return consumer.getSeen(); + return wordcount.getSeen(); } @GetMapping("seen/{user}") public ResponseEntity> seen(@PathVariable String user) { - for (Map> users : consumer.getSeen().values()) + for (Map> users : wordcount.getSeen().values()) { Map words = users.get(user); if (words != null) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 01f9057..0c107f3 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -8,32 +8,24 @@ import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; -import java.time.Clock; import java.time.Duration; -import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class EndlessConsumer implements ConsumerRebalanceListener, Runnable { - final static Pattern PATTERN = Pattern.compile("\\W+"); - - private final ExecutorService executor; - private final PartitionStatisticsRepository repository; private final String id; private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; + private final Consumer consumer; + private final RecordHandler handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -41,46 +33,17 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnable private Exception exception; private long consumed = 0; - private final Map>> seen = new HashMap<>(); - @Override public void onPartitionsRevoked(Collection partitions) { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); - Map> removed = seen.remove(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); - }); + partitions.forEach(tp -> handler.onPartitionRevoked(tp)); } @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - seen.put(partition, document.statistics); - }); + partitions.forEach(tp -> handler.onPartitionAssigned(tp)); } @@ -92,16 +55,14 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnable log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic), this); - Instant lastCommit = clock.instant(); - while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); // Do something with the data... log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { log.info( "{} - {}: {}/{} - {}={}", @@ -113,44 +74,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnable record.value() ); - consumed++; - - Integer partition = record.partition(); - String user = record.key(); - Map> users = seen.get(partition); - - Map words = users.get(user); - if (words == null) - { - words = new HashMap<>(); - users.put(user, words); - } + handler.accept(record); - for (String word : PATTERN.split(record.value())) - { - Long num = words.get(word); - if (num == null) - { - num = 1l; - } - else - { - num++; - } - words.put(word, num); - } + consumed++; } - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } + handler.beforeNextPoll(); } } catch(WakeupException e) @@ -218,11 +147,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnable } } - public Map>> getSeen() - { - return seen; - } - public void start() { lock.lock(); diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 0000000..ff2f193 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.function.Consumer; + + +public interface RecordHandler extends Consumer> +{ + default void beforeNextPoll() {} + + default void onPartitionAssigned(TopicPartition tp) {} + + default void onPartitionRevoked(TopicPartition tp) {} +} diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java new file mode 100644 index 0000000..5981c7d --- /dev/null +++ b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java @@ -0,0 +1,119 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + + +@RequiredArgsConstructor +@Slf4j +public class WordcountRecordHandler implements RecordHandler +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + + private final PartitionStatisticsRepository repository; + private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; + private final Consumer consumer; + + private final Map>> seen = new HashMap<>(); + + private Instant lastCommit = Instant.EPOCH; + + + @Override + public void accept(ConsumerRecord record) + { + Integer partition = record.partition(); + String user = record.key(); + Map> users = seen.get(partition); + + Map words = users.get(user); + if (words == null) + { + words = new HashMap<>(); + users.put(user, words); + } + + for (String word : PATTERN.split(record.value())) + { + Long num = words.get(word); + if (num == null) + { + num = 1l; + } + else + { + num++; + } + words.put(word, num); + } + } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + seen.forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } + + @Override + public void onPartitionAssigned(TopicPartition tp) + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + seen.put(partition, document.statistics); + } + + @Override + public void onPartitionRevoked(TopicPartition tp) + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + newOffset); + Map> removed = seen.remove(partition); + repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + } + + + public Map>> getSeen() + { + return seen; + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index aa6dd4d..408a826 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -26,7 +26,6 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -71,9 +70,10 @@ class ApplicationTests ExecutorService executor; @Autowired PartitionStatisticsRepository repository; + @Autowired + WordcountRecordHandler wordcountRecordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; Set> receivedRecords; @@ -205,8 +205,6 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; - oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); @@ -217,25 +215,25 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(wordcountRecordHandler) { + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = - new EndlessConsumer( + new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), - kafkaConsumer); + kafkaConsumer, + captureOffsetAndExecuteTestHandler); endlessConsumer.start(); } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java new file mode 100644 index 0000000..4047093 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -0,0 +1,41 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + + +@RequiredArgsConstructor +public abstract class TestRecordHandler implements RecordHandler +{ + private final RecordHandler handler; + + + public abstract void onNewRecord(ConsumerRecord record); + + + @Override + public void accept(ConsumerRecord record) + { + this.onNewRecord(record); + handler.accept(record); + } + @Override + + public void beforeNextPoll() + { + handler.beforeNextPoll(); + } + + @Override + public void onPartitionAssigned(TopicPartition tp) + { + handler.onPartitionAssigned(tp); + } + + @Override + public void onPartitionRevoked(TopicPartition tp) + { + handler.onPartitionRevoked(tp); + } +} -- 2.20.1 From 818c1eb862247e25abf9f7d91d5a73e3e3789a39 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 11:13:54 +0200 Subject: [PATCH 14/16] =?utf8?q?refactor:=20RebalanceListener=20als=20eige?= =?utf8?q?nst=C3=A4ndige=20Klasse?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../juplo/kafka/ApplicationConfiguration.java | 9 +++++++ .../java/de/juplo/kafka/EndlessConsumer.java | 17 +++--------- .../kafka/WordcountRebalanceListener.java | 27 +++++++++++++++++++ .../java/de/juplo/kafka/ApplicationTests.java | 5 +++- 4 files changed, 43 insertions(+), 15 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/WordcountRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index b077a90..da1605b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -32,10 +32,18 @@ public class ApplicationConfiguration consumer); } + @Bean + public WordcountRebalanceListener wordcountRebalanceListener( + WordcountRecordHandler wordcountRecordHandler) + { + return new WordcountRebalanceListener(wordcountRecordHandler); + } + @Bean public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + WordcountRebalanceListener wordcountRebalanceListener, WordcountRecordHandler wordcountRecordHandler, ApplicationProperties properties) { @@ -45,6 +53,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, + wordcountRebalanceListener, wordcountRecordHandler); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0c107f3..0f3316d 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -19,12 +19,13 @@ import java.util.concurrent.locks.ReentrantLock; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class EndlessConsumer implements Runnable { private final ExecutorService executor; private final String id; private final String topic; private final Consumer consumer; + private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -34,18 +35,6 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private long consumed = 0; - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> handler.onPartitionRevoked(tp)); - } - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> handler.onPartitionAssigned(tp)); - } - @Override public void run() @@ -53,7 +42,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), this); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java new file mode 100644 index 0000000..fd551c2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -0,0 +1,27 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; + + +@RequiredArgsConstructor +public class WordcountRebalanceListener implements ConsumerRebalanceListener +{ + private final RecordHandler handler; + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + partitions.forEach(tp -> handler.onPartitionAssigned(tp)); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> handler.onPartitionRevoked(tp)); + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 408a826..f4c2104 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -70,7 +70,9 @@ class ApplicationTests ExecutorService executor; @Autowired PartitionStatisticsRepository repository; - @Autowired + @Autowired + WordcountRebalanceListener wordcountRebalanceListener; + @Autowired WordcountRecordHandler wordcountRecordHandler; EndlessConsumer endlessConsumer; @@ -233,6 +235,7 @@ class ApplicationTests properties.getClientId(), properties.getTopic(), kafkaConsumer, + wordcountRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); -- 2.20.1 From fc682d9890787ef363b3e189f6f880a043f3c541 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 11:53:46 +0200 Subject: [PATCH 15/16] refactor: Handling der Partitionen in WordcountRebalanceListener --- .../juplo/kafka/ApplicationConfiguration.java | 12 ++++-- .../java/de/juplo/kafka/RecordHandler.java | 5 --- .../kafka/WordcountRebalanceListener.java | 40 +++++++++++++++++-- .../juplo/kafka/WordcountRecordHandler.java | 32 ++------------- .../de/juplo/kafka/TestRecordHandler.java | 13 ------ 5 files changed, 50 insertions(+), 52 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index da1605b..0d17823 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -25,7 +25,6 @@ public class ApplicationConfiguration { return new WordcountRecordHandler( repository, - properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), properties.getCommitInterval(), @@ -34,9 +33,16 @@ public class ApplicationConfiguration @Bean public WordcountRebalanceListener wordcountRebalanceListener( - WordcountRecordHandler wordcountRecordHandler) + WordcountRecordHandler wordcountRecordHandler, + PartitionStatisticsRepository repository, + Consumer consumer, + ApplicationProperties properties) { - return new WordcountRebalanceListener(wordcountRecordHandler); + return new WordcountRebalanceListener( + wordcountRecordHandler, + repository, + properties.getClientId(), + consumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index ff2f193..3c9dd15 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -1,7 +1,6 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; import java.util.function.Consumer; @@ -9,8 +8,4 @@ import java.util.function.Consumer; public interface RecordHandler extends Consumer> { default void beforeNextPoll() {} - - default void onPartitionAssigned(TopicPartition tp) {} - - default void onPartitionRevoked(TopicPartition tp) {} } diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java index fd551c2..9a69c8f 100644 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -1,27 +1,61 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; import java.util.Collection; +import java.util.Map; @RequiredArgsConstructor +@Slf4j public class WordcountRebalanceListener implements ConsumerRebalanceListener { - private final RecordHandler handler; + private final WordcountRecordHandler handler; + private final PartitionStatisticsRepository repository; + private final String id; + private final Consumer consumer; @Override public void onPartitionsAssigned(Collection partitions) { - partitions.forEach(tp -> handler.onPartitionAssigned(tp)); + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + StatisticsDocument document = + repository + .findById(Integer.toString(partition)) + .orElse(new StatisticsDocument(partition)); + if (document.offset >= 0) + { + // Only seek, if a stored offset was found + // Otherwise: Use initial offset, generated by Kafka + consumer.seek(tp, document.offset); + } + handler.addPartition(partition, document.statistics); + }); } @Override public void onPartitionsRevoked(Collection partitions) { - partitions.forEach(tp -> handler.onPartitionRevoked(tp)); + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + log.info( + "{} - removing partition: {}, offset of next message {})", + id, + partition, + newOffset); + Map> removed = handler.removePartition(partition); + repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + }); } } diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java index 5981c7d..bdf4b32 100644 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java @@ -22,7 +22,6 @@ public class WordcountRecordHandler implements RecordHandler private final PartitionStatisticsRepository repository; - private final String id; private final String topic; private final Clock clock; private final Duration commitInterval; @@ -78,37 +77,14 @@ public class WordcountRecordHandler implements RecordHandler } } - @Override - public void onPartitionAssigned(TopicPartition tp) + public void addPartition(Integer partition, Map> statistics) { - Integer partition = tp.partition(); - Long offset = consumer.position(tp); - log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = - repository - .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); - if (document.offset >= 0) - { - // Only seek, if a stored offset was found - // Otherwise: Use initial offset, generated by Kafka - consumer.seek(tp, document.offset); - } - seen.put(partition, document.statistics); + seen.put(partition, statistics); } - @Override - public void onPartitionRevoked(TopicPartition tp) + public Map> removePartition(Integer partition) { - Integer partition = tp.partition(); - Long newOffset = consumer.position(tp); - log.info( - "{} - removing partition: {}, offset of next message {})", - id, - partition, - newOffset); - Map> removed = seen.remove(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + return seen.remove(partition); } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index 4047093..de28385 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; @RequiredArgsConstructor @@ -26,16 +25,4 @@ public abstract class TestRecordHandler implements RecordHandler { handler.beforeNextPoll(); } - - @Override - public void onPartitionAssigned(TopicPartition tp) - { - handler.onPartitionAssigned(tp); - } - - @Override - public void onPartitionRevoked(TopicPartition tp) - { - handler.onPartitionRevoked(tp); - } } -- 2.20.1 From 9511a89368c96d0b5f09d55adaaed5515c578dcc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 12:04:27 +0200 Subject: [PATCH 16/16] refactor: Alle Kafka-Belange in den `WordcountRebalanceListener` verschoben MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dafür neues Interface `PollIntervalAwareRebalanceListener` eingeführt. * `WordcountRebalanceListener` implementiert das neue Interface und kümmert sich um alle Kafka-Belange. * `WordcountRecordHandler` kümmert sich nur noch um die Fachlogik. --- .../juplo/kafka/ApplicationConfiguration.java | 15 +++------ .../java/de/juplo/kafka/EndlessConsumer.java | 6 ++-- ...ntervalAwareConsumerRebalanceListener.java | 9 ++++++ .../kafka/WordcountRebalanceListener.java | 26 ++++++++++++++-- .../juplo/kafka/WordcountRecordHandler.java | 31 ------------------- 5 files changed, 41 insertions(+), 46 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0d17823..d48c027 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,17 +18,9 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public WordcountRecordHandler wordcountRecordHandler( - PartitionStatisticsRepository repository, - Consumer consumer, - ApplicationProperties properties) + public WordcountRecordHandler wordcountRecordHandler() { - return new WordcountRecordHandler( - repository, - properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), - consumer); + return new WordcountRecordHandler(); } @Bean @@ -42,6 +34,9 @@ public class ApplicationConfiguration wordcountRecordHandler, repository, properties.getClientId(), + properties.getTopic(), + Clock.systemDefaultZone(), + properties.getCommitInterval(), consumer); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0f3316d..58557f2 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener; private final RecordHandler handler; private final Lock lock = new ReentrantLock(); @@ -42,7 +42,7 @@ public class EndlessConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); + consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener); while (true) { @@ -68,7 +68,7 @@ public class EndlessConsumer implements Runnable consumed++; } - handler.beforeNextPoll(); + pollIntervalAwareRebalanceListener.beforeNextPoll(); } } catch(WakeupException e) diff --git a/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java new file mode 100644 index 0000000..8abec12 --- /dev/null +++ b/src/main/java/de/juplo/kafka/PollIntervalAwareConsumerRebalanceListener.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; + + +public interface PollIntervalAwareConsumerRebalanceListener extends ConsumerRebalanceListener +{ + default void beforeNextPoll() {} +} diff --git a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java index 9a69c8f..9f2fc0f 100644 --- a/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/WordcountRebalanceListener.java @@ -3,22 +3,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; import java.util.Collection; import java.util.Map; @RequiredArgsConstructor @Slf4j -public class WordcountRebalanceListener implements ConsumerRebalanceListener +public class WordcountRebalanceListener implements PollIntervalAwareConsumerRebalanceListener { private final WordcountRecordHandler handler; private final PartitionStatisticsRepository repository; private final String id; + private final String topic; + private final Clock clock; + private final Duration commitInterval; private final Consumer consumer; + private Instant lastCommit = Instant.EPOCH; @Override public void onPartitionsAssigned(Collection partitions) @@ -58,4 +64,20 @@ public class WordcountRebalanceListener implements ConsumerRebalanceListener repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); }); } + + + @Override + public void beforeNextPoll() + { + if (lastCommit.plus(commitInterval).isBefore(clock.instant())) + { + log.debug("Storing data and offsets, last commit: {}", lastCommit); + handler.getSeen().forEach((partiton, statistics) -> repository.save( + new StatisticsDocument( + partiton, + statistics, + consumer.position(new TopicPartition(topic, partiton))))); + lastCommit = clock.instant(); + } + } } diff --git a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java index bdf4b32..4efc547 100644 --- a/src/main/java/de/juplo/kafka/WordcountRecordHandler.java +++ b/src/main/java/de/juplo/kafka/WordcountRecordHandler.java @@ -1,36 +1,21 @@ package de.juplo.kafka; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; -import java.time.Clock; -import java.time.Duration; -import java.time.Instant; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; -@RequiredArgsConstructor @Slf4j public class WordcountRecordHandler implements RecordHandler { final static Pattern PATTERN = Pattern.compile("\\W+"); - private final PartitionStatisticsRepository repository; - private final String topic; - private final Clock clock; - private final Duration commitInterval; - private final Consumer consumer; - private final Map>> seen = new HashMap<>(); - private Instant lastCommit = Instant.EPOCH; - @Override public void accept(ConsumerRecord record) @@ -61,22 +46,6 @@ public class WordcountRecordHandler implements RecordHandler } } - - @Override - public void beforeNextPoll() - { - if (lastCommit.plus(commitInterval).isBefore(clock.instant())) - { - log.debug("Storing data and offsets, last commit: {}", lastCommit); - seen.forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( - partiton, - statistics, - consumer.position(new TopicPartition(topic, partiton))))); - lastCommit = clock.instant(); - } - } - public void addPartition(Integer partition, Map> statistics) { seen.put(partition, statistics); -- 2.20.1