From a8e18a7ca9cb26ec03cd3541ddc7b85d93c2a20c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 29 Aug 2022 18:59:29 +0200 Subject: [PATCH 01/16] =?utf8?q?Vorf=C3=BChr-Skript=20=C3=BCberarbeitet:?= =?utf8?q?=20Vorgang=20durch=20andere=20Reihenfolge=20beschleunigt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch das beide Consumer 1x ordentlich gestoppt werden, wird sowohl für `peter` als auch für `klaus` mal die Resultate in der Mongo-DB gespeichert. * Da dies zuvor nur für einen der Nutzer geschehen ist, hat das Skript nach dem außerordentlichen Beenden eines Consumer sehr lange warten müssen, bis nach dem Neustart die Verarbeitung der angelaufenen Daten so weit fortgeschritten war, dass erste Resultate für beide Consumer sichtbar geworden sind. --- README.sh | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/README.sh b/README.sh index 3f74cd1..d22179d 100755 --- a/README.sh +++ b/README.sh @@ -74,20 +74,20 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq -docker-compose stop adder-2 -until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done +docker-compose stop adder-1 +until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results +echo "Resultate für adder-2" +http -v --pretty none -S :8092/results echo -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq +echo "Resultate für peter von adder-2" +http :8092/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-2" +http :8092/results/klaus | jq .[].sum | uniq -docker-compose kill -s 9 adder-1 +docker-compose kill -s 9 adder-2 docker-compose start adder-1 while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -- 2.20.1 From 9f8bcc8f869626efa0a258b3116582c8354a66d1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 31 Aug 2022 16:41:04 +0200 Subject: [PATCH 02/16] =?utf8?q?Klar=20erkennbar=20gemacht,=20dass=20Staff?= =?utf8?q?el=C3=BCbergabe=20nur=20im=20Regelfall=20klappt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 8 +++++++- docker-compose.yml | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/README.sh b/README.sh index d22179d..3292f5f 100755 --- a/README.sh +++ b/README.sh @@ -89,6 +89,7 @@ http :8092/results/klaus | jq .[].sum | uniq docker-compose kill -s 9 adder-2 docker-compose start adder-1 +docker-compose kill -s 9 peter klaus while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done @@ -102,4 +103,9 @@ http :8091/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-1" http :8091/results/klaus | jq .[].sum | uniq -docker-compose kill -s 9 peter klaus +sleep 5 + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq diff --git a/docker-compose.yml b/docker-compose.yml index c46b00d..5f0acac 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -131,7 +131,7 @@ services: server.port: 8080 sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-1 - sumup.adder.commit-interval: 3s + sumup.adder.commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo @@ -145,7 +145,7 @@ services: server.port: 8080 sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-2 - sumup.adder.commit-interval: 3s + sumup.adder.commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo -- 2.20.1 From 9fe70ad96d2b5a9ed0581057da54facba859ad1f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 28 Aug 2022 16:01:22 +0200 Subject: [PATCH 03/16] =?utf8?q?R=C3=BCckbau=20auf=20einen=20Consumer,=20d?= =?utf8?q?er=20in=20`onPartitionsRevoked()`=20nicht=20committed?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch sollte es bei einem Rebalance i.d.R. zu Fehlern in dem mitgeführten Zustand kommen, da die Verarbeitung nur im Zufall an dem Offset fortegführt wird, für den der Zustand gespeichert wurde. * Um das vorherige Verhalten der Implementierung wiederherzustellen, müssen insbesondere die commits im Falle eines ordentlichen Herunterfahrens und eines Deserialisierungs-Fehlers wieder ergänzt werden. Denn ansonsten bestätigt die Implementierung die Offsets für die zuletzt erfolgreich verarbeiteten Nachrichten nicht. * Vorführ-Skript so angepasst, dass man sofort sieht, dass in dieser Version schon eine reguläre "Staffelübergabe" - also auch schon ein normales Rebalance, das einfach durch das Starten eines zweiten Consumers ausgelöst wurde - ein Fehler auftritt. --- README.sh | 34 ------------------- .../juplo/kafka/ApplicationConfiguration.java | 5 +-- .../kafka/ApplicationRebalanceListener.java | 12 ------- .../java/de/juplo/kafka/EndlessConsumer.java | 4 ++- 4 files changed, 4 insertions(+), 51 deletions(-) diff --git a/README.sh b/README.sh index 3292f5f..c9494b9 100755 --- a/README.sh +++ b/README.sh @@ -74,38 +74,4 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq -docker-compose stop adder-1 -until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done - -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose kill -s 9 adder-2 -docker-compose start adder-1 docker-compose kill -s 9 peter klaus -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done - -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -sleep 5 - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e08cff4..1c69760 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -39,15 +38,13 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - properties.getClientId(), - consumer); + properties.getClientId()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index eef0d00..0bfee67 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -17,7 +16,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; - private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -50,16 +48,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { - log.info("{} - Commiting offsets for all previously assigned partitions", id); - try - { - consumer.commitSync(); - } - catch (Exception e) - { - log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); - } - partitions.forEach(tp -> { Integer partition = tp.partition(); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index f0e74d3..00678c4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -71,7 +71,8 @@ public class EndlessConsumer implements Runnable } catch(WakeupException e) { - log.info("{} - RIIING! Request to stop consumption.", id); + log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -85,6 +86,7 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); + consumer.commitSync(); shutdown(e); } catch(Exception e) -- 2.20.1 From 21d88d8ede95d1f811ff91d3804cba6d95ae6aab Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 30 Aug 2022 06:32:12 +0200 Subject: [PATCH 04/16] =?utf8?q?Wechsel=20auf=20den=20`StickyAssignor`=20l?= =?utf8?q?=C3=B6st=20die=20Rebalance-Fehler?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die durch Rebalances ausgelösten Zustand-Fehler bei regulären "Staffelübergaben" lassen sich vollständig durch ein Downgrade des `CooperativeStickyAssignor` auf den `StickyAssignor` lösen. * *Achtung:* Der `StickyAssignor` verwendet das Eager-Protokoll. * D.h., ein Down-Grade auf den `StickyAssignor` benötigt einen Reset der Gruppe, ist also nicht per Rolling Upgrade im laufenden Betrieb möglich. * Vorführ-Skript so angepasst, dass man sofort sieht, dass diese Version alle regulären Rebalance-Fälle ohne Fehler durchführen kann. --- README.sh | 13 +++++++++++++ .../de/juplo/kafka/ApplicationConfiguration.java | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/README.sh b/README.sh index c9494b9..f337d5c 100755 --- a/README.sh +++ b/README.sh @@ -74,4 +74,17 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq +docker-compose stop adder-1 +until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done + +echo "Resultate für adder-2" +http -v --pretty none -S :8092/results +echo + +echo "Resultate für peter von adder-2" +http :8092/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-2" +http :8092/results/klaus | jq .[].sum | uniq + docker-compose kill -s 9 peter klaus diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1c69760..624a4ec 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -77,7 +77,7 @@ public class ApplicationConfiguration Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("auto.offset.reset", properties.getAutoOffsetReset()); -- 2.20.1 From 0c89ddec8156e2f44fe28c9d05fe06f548e9168e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 15:55:35 +0200 Subject: [PATCH 05/16] `maven-failsafe-plugin` aktiviert und den `ApplicationIT` repariert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Das `maven-failsafe-plugin war nich scharf geschaltet, so dass nicht aufgefallen war, dass der Integration-Test `ApplicationIT` wegen fehlender Anpassungen gar nicht mehr lauffähig war. * Anpassungen an dem Integration-Test nachgeholt. --- pom.xml | 3 +++ src/test/java/de/juplo/kafka/ApplicationIT.java | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index ecb559a..6699408 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,9 @@ + + maven-failsafe-plugin + diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index cded0ee..dcac79b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -8,14 +8,14 @@ import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationIT.TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, + "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.adder.topic=" + TOPIC, "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC) @AutoConfigureDataMongo -- 2.20.1 From caed9441a9303af071a572405ae4a665d60faae7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Sep 2022 14:24:07 +0200 Subject: [PATCH 06/16] Der Adder verarbeitet zwei Typen von JSON-Nachrichten anstatt String MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Bisher waren alle Nachrichten vom Typ `String`. * Jetzt verarbeitet der Adder zwei unterschiedliche Typen von Nachrichten. * Die Nachrichten werden als JSON übertragen und mit Hilfe des `JsonDeserializer` von Spring Kafka in zwei unterschiedliche Spezialisierungen einer Basis-Klasse deserialisiert. * Die für die Deserialisierung benötigte Typen-Information wird von dem Spring-Kafka-Tooling über den die `__TypeId__` transportiert. * D.h., damit die Nachrichten korrekt deserialisiert werden können, ist es _nicht_ nötig, dass der Typ der Nachricht von Jackson aus der Nachricht selbst abgeleitet werden kann, sondern dass sich Sender und Empfänger darüber verständigen, welchen Hinweis sie in dem `__TypeId__`-Header hinterlegen. * Die Verarbeitung der zwei Nachrichten-Typen wurde in Unter-Methoden ausgelagert, da dies die Vergleichbarkeit des Codes zur der Variante mit `@KafkaHandler` erhöht. --- pom.xml | 4 +- .../juplo/kafka/ApplicationConfiguration.java | 13 +++-- .../kafka/ApplicationHealthIndicator.java | 2 +- .../juplo/kafka/ApplicationRecordHandler.java | 40 +++++++++++---- src/main/java/de/juplo/kafka/Message.java | 9 ++++ .../java/de/juplo/kafka/MessageAddNumber.java | 19 +++++++ .../de/juplo/kafka/MessageCalculateSum.java | 16 ++++++ .../java/de/juplo/kafka/ApplicationTests.java | 50 +++++++++++++------ src/test/java/de/juplo/kafka/MessageTest.java | 39 +++++++++++++++ 9 files changed, 158 insertions(+), 34 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Message.java create mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java create mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java create mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/pom.xml b/pom.xml index 6699408..43a63c7 100644 --- a/pom.xml +++ b/pom.xml @@ -44,8 +44,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 624a4ec..156b5a0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Optional; import java.util.Properties; @@ -48,8 +49,8 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, @@ -72,7 +73,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -84,7 +85,11 @@ public class ApplicationConfiguration props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); + props.put(JsonDeserializer.TYPE_MAPPINGS, + Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," + + Message.Type.CALC + ":" + MessageCalculateSum.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..03a14c8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..2829157 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -12,7 +12,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -21,22 +21,40 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); + public void addNumber( + Integer partition, + String user, + MessageAddNumber message) + { + state.get(partition).addToSum(user, message.getNext()); + } + + public void calculateSum( + Integer partition, + String user, + MessageCalculateSum message) + { + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + } + @Override - public void accept(ConsumerRecord record) + public void accept(ConsumerRecord record) { Integer partition = record.partition(); String user = record.key(); - String message = record.value(); + Message message = record.value(); - if (message.equals("CALCULATE")) - { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - } - else + switch(message.getType()) { - state.get(partition).addToSum(user, Integer.parseInt(message)); + case ADD: + addNumber(partition, user, (MessageAddNumber) message); + break; + + case CALC: + calculateSum(partition, user, (MessageCalculateSum) message); + break; } if (throttle.isPresent()) diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 0000000..e4999b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 0000000..c024b65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 0000000..afc5a39 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6a037eb..bd9f449 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -15,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j -public class ApplicationTests extends GenericApplicationTests +public class ApplicationTests extends GenericApplicationTests { @Autowired StateRepository stateRepository; @@ -39,7 +39,7 @@ public class ApplicationTests extends GenericApplicationTests .mapToObj(i -> "seeräuber-" + i) .toArray(i -> new String[i]); final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); + final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); int counter = 0; @@ -72,7 +72,13 @@ public class ApplicationTests extends GenericApplicationTests if (message[i] > number[i]) { - send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender); + send( + key, + calculateMessage, + Message.Type.CALC, + poisonPill(poisonPills, pass, counter), + logicError(logicErrors, pass, counter), + messageSender); state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); // Pick next number to calculate number[i] = numbers[next++%numbers.length]; @@ -80,15 +86,25 @@ public class ApplicationTests extends GenericApplicationTests log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); } - Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++))); - send(key, value, fail(logicErrors, pass, counter), messageSender); + send( + key, + new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), + Message.Type.ADD, + poisonPill(poisonPills, pass, counter), + logicError(logicErrors, pass, counter), + messageSender); } } return counter; } - boolean fail (boolean logicErrors, int pass, int counter) + boolean poisonPill (boolean poisonPills, int pass, int counter) + { + return poisonPills && pass > 300 && counter%99 == 0; + } + + boolean logicError(boolean logicErrors, int pass, int counter) { return logicErrors && pass > 300 && counter%77 == 0; } @@ -96,23 +112,25 @@ public class ApplicationTests extends GenericApplicationTests void send( Bytes key, Bytes value, - boolean fail, + Message.Type type, + boolean poisonPill, + boolean logicError, Consumer> messageSender) { counter++; - if (fail) + if (logicError) { - value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1))); + value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); + } + if (poisonPill) + { + value = new Bytes("BOOM!".getBytes()); } - messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); - } - - @Override - public boolean canGeneratePoisonPill() - { - return false; + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + record.headers().add("__TypeId__", type.toString().getBytes()); + messageSender.accept(record); } @Override diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 0000000..52794ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.*; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +} -- 2.20.1 From c4cf0feba4bc73d2c8ec23c2a83da609c99a3f9d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 07:35:52 +0200 Subject: [PATCH 07/16] =?utf8?q?Separate=20Artefakt-/Image-ID=20f=C3=BCr?= =?utf8?q?=20die=20JSON-Version=20des=20requests-Services?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Version des Addierer-Services, die JSON-Nachrichten verarbeitet, hat jetzt eine explizite eigene Artefakt- und Image-ID. * Außerdem wurde das Compose-Setup so umgestellt, dass es explizit die JSON-Version des Requests- und des Adder-Services verwendet. --- README.sh | 2 +- docker-compose.yml | 8 ++++---- pom.xml | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.sh b/README.sh index f337d5c..22f52f0 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder:1.0-SNAPSHOT +IMAGE=juplo/sumup-adder-json:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker-compose.yml b/docker-compose.yml index 5f0acac..5d33cd1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -106,7 +106,7 @@ services: sumup.gateway.topic: in requests-1: - image: juplo/sumup-requests:1.0-SNAPSHOT + image: juplo/sumup-requests-json:1.0-SNAPSHOT ports: - 8081:8080 environment: @@ -115,7 +115,7 @@ services: sumup.requests.client-id: requests-1 requests-2: - image: juplo/sumup-requests:1.0-SNAPSHOT + image: juplo/sumup-requests-json:1.0-SNAPSHOT ports: - 8082:8080 environment: @@ -124,7 +124,7 @@ services: sumup.requests.client-id: requests-2 adder-1: - image: juplo/sumup-adder:1.0-SNAPSHOT + image: juplo/sumup-adder-json:1.0-SNAPSHOT ports: - 8091:8080 environment: @@ -138,7 +138,7 @@ services: logging.level.org.apache.kafka.clients.consumer: DEBUG adder-2: - image: juplo/sumup-adder:1.0-SNAPSHOT + image: juplo/sumup-adder-json:1.0-SNAPSHOT ports: - 8092:8080 environment: diff --git a/pom.xml b/pom.xml index 43a63c7..17d3cba 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - sumup-adder + sumup-adder-json 1.0-SNAPSHOT SumUp Adder - Calculates the sum for the send messages + Calculates the sum for the send messages. This version consumes JSON-messages. 11 -- 2.20.1 From f18a765cc650b81788f356a80f975926930600c5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Apr 2022 11:08:37 +0200 Subject: [PATCH 08/16] =?utf8?q?Springify:=20Konfiguration=20erfolgt=20?= =?utf8?q?=C3=BCber=20`KafkaProperties`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` aus `springified-consumer--config' übernommen. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/main/java/de/juplo/kafka/ApplicationProperties.java ** src/main/resources/application.yml ** src/test/java/de/juplo/kafka/ApplicationTests.java * Anpassungen an `README.sh`, `docker-compose.yml` und `pom.xml` nachgeholt. --- README.sh | 2 +- docker-compose.yml | 22 ++++++------ pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 36 ++++++++++--------- .../de/juplo/kafka/ApplicationProperties.java | 14 -------- src/main/resources/application.yml | 22 +++++++----- .../java/de/juplo/kafka/ApplicationIT.java | 2 +- .../juplo/kafka/GenericApplicationTests.java | 19 +++++----- 8 files changed, 58 insertions(+), 61 deletions(-) diff --git a/README.sh b/README.sh index 22f52f0..07e36d7 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder-json:1.0-SNAPSHOT +IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker-compose.yml b/docker-compose.yml index 5d33cd1..16fec5b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -96,13 +96,13 @@ services: command: sleep infinity gateway: - image: juplo/sumup-gateway:1.0-SNAPSHOT + image: juplo/sumup-gateway--springified:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - sumup.gateway.bootstrap-server: kafka:9092 - sumup.gateway.client-id: gateway + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: gateway sumup.gateway.topic: in requests-1: @@ -124,28 +124,28 @@ services: sumup.requests.client-id: requests-2 adder-1: - image: juplo/sumup-adder-json:1.0-SNAPSHOT + image: juplo/sumup-adder-springified:1.0-SNAPSHOT ports: - 8091:8080 environment: server.port: 8080 - sumup.adder.bootstrap-server: kafka:9092 - sumup.adder.client-id: adder-1 - sumup.adder.commit-interval: 1s + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafak.client-id: adder-1 + spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo logging.level.org.apache.kafka.clients.consumer: DEBUG adder-2: - image: juplo/sumup-adder-json:1.0-SNAPSHOT + image: juplo/sumup-adder-springified:1.0-SNAPSHOT ports: - 8092:8080 environment: server.port: 8080 - sumup.adder.bootstrap-server: kafka:9092 - sumup.adder.client-id: adder-2 - sumup.adder.commit-interval: 1s + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafak.client-id: adder-2 + spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index 17d3cba..a252d1c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ de.juplo.kafka - sumup-adder-json + sumup-adder-springified 1.0-SNAPSHOT SumUp Adder Calculates the sum for the send messages. This version consumes JSON-messages. diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 156b5a0..523707f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,18 +15,19 @@ import java.util.concurrent.Executors; @Configuration -@EnableConfigurationProperties(ApplicationProperties.class) +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { @Bean public ApplicationRecordHandler recordHandler( AdderResults adderResults, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new ApplicationRecordHandler( adderResults, - Optional.ofNullable(properties.getThrottle()), - properties.getClientId()); + Optional.ofNullable(applicationProperties.getThrottle()), + kafkaProperties.getClientId()); } @Bean @@ -39,13 +41,14 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - properties.getClientId()); + kafkaProperties.getClientId()); } @Bean @@ -54,13 +57,14 @@ public class ApplicationConfiguration ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, rebalanceListener, recordHandler); @@ -73,17 +77,17 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(KafkaProperties kafkaProperties) { Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor"); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); - props.put("metadata.max.age.ms", "1000"); + props.put("group.id", kafkaProperties.getConsumer().getGroupId()); + props.put("client.id", kafkaProperties.getClientId()); + props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)kafkaProperties.getConsumer().getAutoCommitInterval().toMillis()); + props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms")); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", JsonDeserializer.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index f852c00..005460c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,22 +16,8 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String clientId; @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String autoOffsetReset; - @NotNull - private Duration commitInterval; private Duration throttle; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 26948f5..a899340 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ sumup: adder: - bootstrap-server: :9092 - group-id: my-group - client-id: DEV topic: out - auto-offset-reset: earliest - commit-interval: 5s management: endpoint: shutdown: @@ -21,16 +16,25 @@ management: enabled: true info: kafka: - bootstrap-server: ${consumer.bootstrap-server} - client-id: ${consumer.client-id} - group-id: ${consumer.group-id} + bootstrap-server: ${spring.kafka.consumer.bootstrap-servers} + client-id: ${spring.kafka.consumer.client-id} + group-id: ${spring.kafka.consumer.group-id} topic: ${consumer.topic} - auto-offset-reset: ${consumer.auto-offset-reset} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} spring: data: mongodb: uri: mongodb://juplo:training@localhost:27017 database: juplo + kafka: + bootstrap-servers: :9092 + client-id: DEV + consumer: + group-id: my-group + auto-offset-reset: earliest + auto-commit-interval: 5s + properties: + metadata.max.age.ms: 1000 logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index dcac79b..4bb4f5b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -14,7 +14,7 @@ import static de.juplo.kafka.ApplicationIT.TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8849317..869b5d9 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -14,6 +14,7 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; @@ -40,9 +41,9 @@ import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=500ms", + "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -59,7 +60,9 @@ abstract class GenericApplicationTests @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; + ApplicationProperties applicationProperties; + @Autowired + KafkaProperties kafkaProperties; @Autowired ExecutorService executor; @Autowired @@ -330,16 +333,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", kafkaProperties.getConsumer().getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -373,8 +376,8 @@ abstract class GenericApplicationTests endlessConsumer = new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, rebalanceListener, captureOffsetAndExecuteTestHandler); -- 2.20.1 From 7adff476ad862d30d668d75212d1ca1c7cf16b03 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Apr 2022 11:24:55 +0200 Subject: [PATCH 09/16] =?utf8?q?Springify:=20Der=20Kafka-`Consumer`=20wird?= =?utf8?q?=20=C3=BCber=20die=20Spring-Factory=20erzeugt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` aus `springified-consumer--config' übernommen. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/test/java/de/juplo/kafka/ApplicationTests.java * Damit Spring Kafka den Consumer instanziieren kann, musste insbesondere noch der Teil der Konfiguration, der fix ist, aus der Konfig-Klasse `ApplicationConfiguration` in die YAML-Datei `application.yml` verschoben werden: ** Die Auswahl des `StickyAssignor` als Partition-Assignment-Strategy ** Die Konfiguration der Deserialisierer --- .../juplo/kafka/ApplicationConfiguration.java | 29 ++++--------------- src/main/resources/application.yml | 6 ++++ .../juplo/kafka/GenericApplicationTests.java | 13 ++++++--- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 523707f..bae5d51 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,14 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Optional; -import java.util.Properties; +import org.springframework.kafka.core.ConsumerFactory; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -53,7 +52,7 @@ public class ApplicationConfiguration @Bean public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + Consumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, @@ -77,24 +76,8 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(KafkaProperties kafkaProperties) + public Consumer kafkaConsumer(ConsumerFactory factory) { - Properties props = new Properties(); - - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor"); - props.put("group.id", kafkaProperties.getConsumer().getGroupId()); - props.put("client.id", kafkaProperties.getClientId()); - props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset()); - props.put("auto.commit.interval.ms", (int)kafkaProperties.getConsumer().getAutoCommitInterval().toMillis()); - props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms")); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); - props.put(JsonDeserializer.TYPE_MAPPINGS, - Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," + - Message.Type.CALC + ":" + MessageCalculateSum.class.getName()); - - return new KafkaConsumer<>(props); + return factory.createConsumer(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a899340..92f3a6b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -33,8 +33,14 @@ spring: group-id: my-group auto-offset-reset: earliest auto-commit-interval: 5s + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: + partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 + spring.json.type.mapping: > + ADD:de.juplo.kafka.MessageAddNumber, + CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 869b5d9..21c3f7f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -14,6 +14,7 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; @@ -38,7 +39,11 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@SpringJUnitConfig( + initializers = ConfigDataApplicationContextInitializer.class, + classes = { + KafkaAutoConfiguration.class, + ApplicationTests.Configuration.class }) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -56,13 +61,13 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired ApplicationProperties applicationProperties; - @Autowired - KafkaProperties kafkaProperties; + @Autowired + KafkaProperties kafkaProperties; @Autowired ExecutorService executor; @Autowired -- 2.20.1 From 2eb3c45c9438a20777b0110defa593dd45c64511 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Sep 2022 12:44:54 +0200 Subject: [PATCH 10/16] Der Test verwendet die `@Bean` von `EndlessConsumer` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` vom Branch `deserialization` gepflückt. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/test/java/de/juplo/kafka/ApplicationTests.java ** src/test/java/de/juplo/kafka/GenericApplicationTests.java --- .../juplo/kafka/ApplicationConfiguration.java | 7 +- .../juplo/kafka/GenericApplicationTests.java | 75 +++++++------------ .../de/juplo/kafka/TestRecordHandler.java | 16 +++- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bae5d51..08c827c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,7 +18,7 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler( + public ApplicationRecordHandler applicationRecordHandler( AdderResults adderResults, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) @@ -40,8 +40,7 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaProperties kafkaProperties) { return new ApplicationRebalanceListener( recordHandler, @@ -55,7 +54,7 @@ public class ApplicationConfiguration Consumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, - ApplicationRecordHandler recordHandler, + RecordHandler recordHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 21c3f7f..937b40f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,8 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -20,6 +18,7 @@ import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -27,7 +26,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -63,28 +61,21 @@ abstract class GenericApplicationTests @Autowired org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired - Consumer> consumer; - @Autowired - ApplicationProperties applicationProperties; - @Autowired KafkaProperties kafkaProperties; @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + TestRecordHandler recordHandler; @Autowired - RecordHandler recordHandler; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -108,7 +99,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -116,7 +107,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -140,7 +131,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -149,8 +140,8 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + assertThat(recordHandler.receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -177,7 +168,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -185,7 +176,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -238,7 +229,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -356,43 +347,30 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @AfterEach public void deinit() { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + try { testRecordProducer.close(); @@ -409,5 +387,10 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } } } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index b4efdd6..37d3f65 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -2,16 +2,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Set; @RequiredArgsConstructor -public abstract class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { private final RecordHandler handler; + Map seenOffsets; + Set> receivedRecords; - public abstract void onNewRecord(ConsumerRecord record); + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } @Override public void accept(ConsumerRecord record) -- 2.20.1 From f095f71a104fcde025a63f87ba75eb5cb3136656 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 19:30:29 +0200 Subject: [PATCH 11/16] Auf `@KafkaHandler` umgestellt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Autoconfiguration über die Annotation `@EnableKafka` aktiviert * Da die Autoconfiguration von Spring Kafka zieht, vereinfacht sich die Konfiguration: ** Spring Kafka erzeugt den benötigten `MessageListenerContainer`, der für die Anbindung der mit `@KafkaHandler` annotierten Methode benötigt wird automatisch und versorgt ihn mit einem passenden `KafkaConsumer`, so dass letzterer nicht mehr explizit erzeugt werden muss. ** Der Scheduler wird von Spring Kafka erzeugt und verwaltet, so dass nicht mehr explizit ein `ExecutorService` erzeugt und beendet werden muss. ** Der Rebalance-Listener wird automatisch eingebunden, der `ApplicationRebalanceListener` muss allerdings von der richtigen Spring-Klasse ableiten, damit er von der Autoconfiguration gefunden wird. * Um das von dem Testfall erwartete Default-Verhalten des `KafkaConsumer` mit dem `MessageListenerContainer` zu simulieren, musste ein angepasster `ErrorHandler` implementiert werden. * Der Code zum Exception-Handling und zum Schließen des `KafkaConsumer` in `EndlessConsumer` entfällt. * Der Code zum Starten/Stoppen in `EndlessConsumer` kann einfach die entsprechende Methoden des `MessageListenerContainers aufrufen. Dafür muss er allerdings erst die passende Instanz aus einer Registry über die Client-ID erfragen. * Der Testfall musste an die Autoconfiguration angepasst werden: ** Die `KafkaAutoConfiguration` muss hier explizit eingebunden werden. ** Da auch der Test einen `KafkaConsumer` benötigt, muss die in der Anwendung nicht mehr benötigte Factory jetzt explizit für den Test bereitgestellt werden. --- README.sh | 23 ++- src/main/java/de/juplo/kafka/Application.java | 27 --- .../juplo/kafka/ApplicationConfiguration.java | 35 +--- .../juplo/kafka/ApplicationErrorHandler.java | 70 +++++++ .../kafka/ApplicationRebalanceListener.java | 3 +- .../java/de/juplo/kafka/EndlessConsumer.java | 195 +++--------------- .../juplo/kafka/GenericApplicationTests.java | 18 +- 7 files changed, 151 insertions(+), 220 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/ApplicationErrorHandler.java diff --git a/README.sh b/README.sh index 07e36d7..a2d813d 100755 --- a/README.sh +++ b/README.sh @@ -76,7 +76,7 @@ http :8092/results/klaus | jq .[].sum | uniq docker-compose stop adder-1 until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done echo "Resultate für adder-2" http -v --pretty none -S :8092/results @@ -87,4 +87,25 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq +docker-compose kill -s 9 adder-2 +docker-compose start adder-1 docker-compose kill -s 9 peter klaus +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done +until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done +until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done + +echo "Resultate für adder-1" +http -v --pretty none -S :8091/results +echo + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq + +sleep 5 + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..a4d9aeb 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -8,8 +8,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; @SpringBootApplication @@ -18,8 +16,6 @@ public class Application implements ApplicationRunner { @Autowired EndlessConsumer endlessConsumer; - @Autowired - ExecutorService executor; @Override @@ -45,29 +41,6 @@ public class Application implements ApplicationRunner { log.error("Unexpected exception while stopping EndlessConsumer: {}", e); } - - try - { - log.info("Shutting down the ExecutorService."); - executor.shutdown(); - log.info("Waiting 5 seconds for the ExecutorService to terminate..."); - executor.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e); - } - finally - { - if (!executor.isTerminated()) - { - log.warn("Forcing shutdown of ExecutorService!"); - executor - .shutdownNow() - .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName())); - } - log.info("Shutdow of ExecutorService finished"); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c827c..f8bf857 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,16 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Optional; -import org.springframework.kafka.core.ConsumerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; @Configuration @@ -49,34 +46,24 @@ public class ApplicationConfiguration kafkaProperties.getClientId()); } + @Bean + public ApplicationErrorHandler applicationErrorHandler() + { + return new ApplicationErrorHandler(); + } + @Bean public EndlessConsumer endlessConsumer( - Consumer kafkaConsumer, - ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, RecordHandler recordHandler, + ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaListenerEndpointRegistry endpointRegistry) { return new EndlessConsumer<>( - executor, kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, + endpointRegistry, + errorHandler, recordHandler); } - - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); - } - - @Bean(destroyMethod = "close") - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java new file mode 100644 index 0000000..6e15717 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -0,0 +1,70 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.util.Assert; + +import java.util.Optional; + + +@Slf4j +public class ApplicationErrorHandler implements CommonErrorHandler +{ + private Exception exception; + private boolean ack = true; + + + @Override + public void handleOtherException( + Exception thrownException, + Consumer consumer, + MessageListenerContainer container, + boolean batchListener) + { + Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleBatch( + Exception thrownException, + ConsumerRecords data, + Consumer consumer, + MessageListenerContainer container, + Runnable invokeListener) + { + // Do not commit the polled offsets on a logic-error + ack = false; + rememberExceptionAndStopContainer(thrownException, container); + } + + private void rememberExceptionAndStopContainer( + Exception exception, + MessageListenerContainer container) + { + log.error("{}, stopping container {} abnormally", exception, container); + this.exception = exception; + container.stopAbnormally(() -> log.info("{} is stopped", container)); + } + + @Override + public boolean isAckAfterHandle() + { + return ack; + } + + + public Optional getException() + { + return Optional.ofNullable(exception); + } + + public void clearState() + { + this.exception = null; + this.ack = true; + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0bfee67..ba15227 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -4,13 +4,14 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener +public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..d3d11ae 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,55 +2,36 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.List; +import java.util.Optional; -@Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +@Slf4j +public class EndlessConsumer { - private final ExecutorService executor; private final String id; - private final String topic; - private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final KafkaListenerEndpointRegistry registry; + private final ApplicationErrorHandler errorHandler; private final RecordHandler recordHandler; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; private long consumed = 0; - - @Override - public void run() + @KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + batch = "true", + autoStartup = "false") + public void accept(List> records) { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); + log.info("{} - Received {} messages", id, records.size()); for (ConsumerRecord record : records) { log.info( @@ -67,146 +48,38 @@ public class EndlessConsumer implements Runnable consumed++; } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } - } - - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } } public void start() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } + if (running()) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - public synchronized void stop() throws InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } + log.info("{} - Starting - consumed {} messages before", id, consumed); + errorHandler.clearState(); + registry.getListenerContainer(id).start(); } - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException + public void stop() { - log.info("{} - Destroy!", id); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + if (!running()) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); } public boolean running() { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } + return registry.getListenerContainer(id).isRunning(); } public Optional exitStatus() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } + if (running()) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return errorHandler.getException(); } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 937b40f..753debe 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -12,7 +13,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; @@ -20,6 +20,8 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -37,11 +39,7 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -69,6 +67,8 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + @Autowired TestRecordHandler recordHandler; @Autowired EndlessConsumer endlessConsumer; @@ -392,5 +392,11 @@ abstract class GenericApplicationTests { return new TestRecordHandler(applicationRecordHandler); } + + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } } } -- 2.20.1 From 316c89aa6aafbc339eda3727638c75f5489c0a99 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 14:15:11 +0200 Subject: [PATCH 12/16] `EndlessConsumer` nimmt jetzt einzelne `ConsumerRecord`s entgegen MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * `@KafkaHandler` von Batch-Verarbeitung auf Einzel-Verarbeitung umgestellt. * Den `ApplicationErrorHandler` um eine passende Fehler-Verarbeitung für die einzelne Verarbeitung der Nachrichten ergänzt * Da der `MessageListenerContainer` nicht dazu zu bewegen ist, die Offset-Commits im Fehlerfall zu unterlassen, wird explizit ein Seek auf die Offset-Positionen der noch nicht verarbeiteten Nachrichten durchgeführt. * Dabei wurde ein von Spring Kafka abgeschauter Trick verwendet: Es genügt, die bisher unverarbeiteten Nachrichten durchzugehen und jeweils den Offset der ersten Nachricht, die zu einer Partition gesehen wird, für den Seek vorzumerken. Denn wenn dabei für eine Partition keine Nachricht gefunden wird, hat entweder das letzte `poll() keine Nachricht zu der Partition geliefert, oder alle Nachrichten, die zu der Partition gehört haben, wurden erfolgreich verarbeitet. In beiden Fällen stimmt der Offset bereits, den die Kafka-Bibliothek intern pflegt, so dass kein Seek durchgeführt werden muss! * Der Testfall wurde entsprechend angepasst und läuft daher in dieser Variante auch ohne Fehler, da der gespeicherte Zustand dadurch zu den bestätigten Offsets passt. --- .../juplo/kafka/ApplicationErrorHandler.java | 28 +++++++++++++++++-- .../java/de/juplo/kafka/EndlessConsumer.java | 8 +----- .../juplo/kafka/GenericApplicationTests.java | 6 ++-- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java index 6e15717..52c6a0c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -2,11 +2,15 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.listener.CommonErrorHandler; import org.springframework.kafka.listener.MessageListenerContainer; -import org.springframework.util.Assert; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; @@ -16,6 +20,11 @@ public class ApplicationErrorHandler implements CommonErrorHandler private Exception exception; private boolean ack = true; + @Override + public boolean remainingRecords() + { + return true; + } @Override public void handleOtherException( @@ -24,7 +33,22 @@ public class ApplicationErrorHandler implements CommonErrorHandler MessageListenerContainer container, boolean batchListener) { - Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleRemaining( + Exception thrownException, + List> records, + Consumer consumer, + MessageListenerContainer container) + { + Map offsets = new HashMap<>(); + records.forEach(record -> + offsets.computeIfAbsent( + new TopicPartition(record.topic(), record.partition()), + offset -> record.offset())); + offsets.forEach((tp, offset) -> consumer.seek(tp, offset)); rememberExceptionAndStopContainer(thrownException, container); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index d3d11ae..01397a2 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -26,14 +26,9 @@ public class EndlessConsumer id = "${spring.kafka.client-id}", idIsGroup = false, topics = "${sumup.adder.topic}", - batch = "true", autoStartup = "false") - public void accept(List> records) + public void accept(ConsumerRecord record) { - // Do something with the data... - log.info("{} - Received {} messages", id, records.size()); - for (ConsumerRecord record : records) - { log.info( "{} - {}: {}/{} - {}={}", id, @@ -47,7 +42,6 @@ public class EndlessConsumer recordHandler.accept(record); consumed++; - } } public void start() diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 753debe..124143c 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -157,7 +157,7 @@ abstract class GenericApplicationTests @Test @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() + void commitsOffsetsOfUnseenRecordsOnLogicError() { int numberOfGeneratedMessages = recordGenerator.generate(false, true, messageSender); @@ -168,7 +168,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -176,7 +176,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") -- 2.20.1 From 0c9a0c1cf9a0065012743efcd940d8721bc33c20 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 18:04:04 +0200 Subject: [PATCH 13/16] `EndlessConsumer` auf `@KafkaHandler` umgestellt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Mit `@KafkaHandler` können separate Handler-Methoden für die unterschiedlichen Nachrichten-Typen definiert werden, die die Anwendung empfängt (hier: über ein Topic, auch mögich: über verschiedene Topics). * Die Tests mussten an die geänderte Implementierung angepasst werden. --- .../juplo/kafka/ApplicationConfiguration.java | 4 +- .../kafka/ApplicationHealthIndicator.java | 2 +- .../juplo/kafka/ApplicationRecordHandler.java | 29 +++------ .../java/de/juplo/kafka/EndlessConsumer.java | 61 ++++++++++++++----- .../java/de/juplo/kafka/RecordHandler.java | 14 ++++- .../juplo/kafka/GenericApplicationTests.java | 11 ++-- .../de/juplo/kafka/TestRecordHandler.java | 44 +++++++++---- 7 files changed, 109 insertions(+), 56 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index f8bf857..1755747 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -53,14 +53,14 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( + public EndlessConsumer endlessConsumer( RecordHandler recordHandler, ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, KafkaListenerEndpointRegistry endpointRegistry) { return - new EndlessConsumer<>( + new EndlessConsumer( kafkaProperties.getClientId(), endpointRegistry, errorHandler, diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index 03a14c8..ab9782c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2829157..f4d3671 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.HashMap; @@ -12,7 +11,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -21,42 +20,34 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); + @Override public void addNumber( + String topic, Integer partition, + Long offset, String user, MessageAddNumber message) { state.get(partition).addToSum(user, message.getNext()); + throttle(); } + @Override public void calculateSum( + String topic, Integer partition, + Long offset, String user, MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); + throttle(); } - @Override - public void accept(ConsumerRecord record) + private void throttle() { - Integer partition = record.partition(); - String user = record.key(); - Message message = record.value(); - - switch(message.getType()) - { - case ADD: - addNumber(partition, user, (MessageAddNumber) message); - break; - - case CALC: - calculateSum(partition, user, (MessageCalculateSum) message); - break; - } - if (throttle.isPresent()) { try diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 01397a2..655151a 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -3,8 +3,12 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import java.util.List; import java.util.Optional; @@ -12,34 +16,63 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class EndlessConsumer +@KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + autoStartup = "false") +public class EndlessConsumer { private final String id; private final KafkaListenerEndpointRegistry registry; private final ApplicationErrorHandler errorHandler; - private final RecordHandler recordHandler; + private final RecordHandler recordHandler; private long consumed = 0; - @KafkaListener( - id = "${spring.kafka.client-id}", - idIsGroup = false, - topics = "${sumup.adder.topic}", - autoStartup = "false") - public void accept(ConsumerRecord record) + @KafkaHandler + public void addNumber( + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Payload MessageAddNumber message) { log.info( "{} - {}: {}/{} - {}={}", id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() + offset, + topic, + partition, + key, + message ); - recordHandler.accept(record); + recordHandler.addNumber(topic, partition, offset, key, message); + + consumed++; + } + + @KafkaHandler + public void calculateSum( + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, + @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) Long offset, + @Payload MessageCalculateSum message) + { + log.info( + "{} - {}: {}/{} - {}={}", + id, + offset, + topic, + partition, + key, + message + ); + + recordHandler.calculateSum(topic, partition, offset, key, message); consumed++; } diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java index 327ac9f..47f984e 100644 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -5,6 +5,18 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.function.Consumer; -public interface RecordHandler extends Consumer> +public interface RecordHandler { + void addNumber( + String topic, + Integer partition, + Long offset, + String user, + MessageAddNumber message); + void calculateSum( + String topic, + Integer partition, + Long offset, + String user, + MessageCalculateSum message); } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 124143c..49ddb47 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -69,9 +68,9 @@ abstract class GenericApplicationTests @Autowired KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; @Autowired - TestRecordHandler recordHandler; + TestRecordHandler recordHandler; @Autowired - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -99,7 +98,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -141,7 +140,7 @@ abstract class GenericApplicationTests checkSeenOffsetsForProgress(); assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) + assertThat(recordHandler.receivedMessages) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -348,7 +347,7 @@ abstract class GenericApplicationTests oldOffsets = new HashMap<>(); recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); + recordHandler.receivedMessages = 0; doForCurrentOffsets((tp, offset) -> { diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index 37d3f65..d9f4e67 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -1,34 +1,52 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import java.util.List; import java.util.Map; -import java.util.Set; @RequiredArgsConstructor -public class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { - private final RecordHandler handler; + private final RecordHandler handler; Map seenOffsets; - Set> receivedRecords; + int receivedMessages; - public void onNewRecord(ConsumerRecord record) + public void onNewRecord( + String topic, + Integer partition, + Long offset, + Message messgage) { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); + seenOffsets.put(new TopicPartition(topic, partition), offset); + receivedMessages++; } @Override - public void accept(ConsumerRecord record) + public void addNumber( + String topic, + Integer partition, + Long offset, + String user, + MessageAddNumber message) { - this.onNewRecord(record); - handler.accept(record); + this.onNewRecord(topic, partition, offset, message); + handler.addNumber(topic, partition, offset, user, message); + } + + @Override + public void calculateSum( + String topic, + Integer partition, + Long offset, + String user, + MessageCalculateSum message) + { + this.onNewRecord(topic, partition, offset, message); + handler.calculateSum(topic, partition, offset, user, message); } } -- 2.20.1 From d8ec8ee7ea93e801051ce3cd6f83db2aa20e6b95 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 19:19:15 +0200 Subject: [PATCH 14/16] Anwendung auf den Default-ErrorHandler umgestellt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Tests mussten entsprechend angepasst werden, da die Methode `EndlessConsumer.exitStatus()` aufgrund der Umstellung nicht mehr verfügbar ist. * Die Logik der Tests wurde aber nicht geändert. * Die Tests zeigen nur, dass die Anwendung sich nicht wie zuvor beendet. * Durch manuelle Versuchen wurden folgende Erkenntnisse gewonnen: ** Im Fall eines Deserialisierungs-Fehlers begibt sich die Anwendung in eine Endlosschleife. ** Da, in der Fehlersituation keine Commits durchgeführt werden, hängt die Anwendung dann auch nach einem Neustart weiter in der Fehlerschleife. ** Im Fall eines Logik-Fehlers Startet ein Back-Off mit 10 Versuchen. ** Dabei werden nach jedem Fehler die Offsets für alle Partitionen für die der letzte `poll()` Nachrichten geliefert hatte, die noch nicht verarbeitet wurden, auf die nächste unverarbeitete Nachricht zurück gesetzt und anchließend wird `poll()` neu ausgeführt. ** Nach dem letzten Versuch springt die Anwendung über den Fehler hinweg. --- .../juplo/kafka/ApplicationConfiguration.java | 8 -- .../juplo/kafka/ApplicationErrorHandler.java | 94 ------------------- .../kafka/ApplicationHealthIndicator.java | 15 +-- .../java/de/juplo/kafka/EndlessConsumer.java | 10 -- .../juplo/kafka/GenericApplicationTests.java | 32 +++---- 5 files changed, 16 insertions(+), 143 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ApplicationErrorHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1755747..c09eec3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -46,16 +46,9 @@ public class ApplicationConfiguration kafkaProperties.getClientId()); } - @Bean - public ApplicationErrorHandler applicationErrorHandler() - { - return new ApplicationErrorHandler(); - } - @Bean public EndlessConsumer endlessConsumer( RecordHandler recordHandler, - ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, KafkaListenerEndpointRegistry endpointRegistry) { @@ -63,7 +56,6 @@ public class ApplicationConfiguration new EndlessConsumer( kafkaProperties.getClientId(), endpointRegistry, - errorHandler, recordHandler); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java deleted file mode 100644 index 52c6a0c..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java +++ /dev/null @@ -1,94 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.TopicPartition; -import org.springframework.kafka.listener.CommonErrorHandler; -import org.springframework.kafka.listener.MessageListenerContainer; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - - -@Slf4j -public class ApplicationErrorHandler implements CommonErrorHandler -{ - private Exception exception; - private boolean ack = true; - - @Override - public boolean remainingRecords() - { - return true; - } - - @Override - public void handleOtherException( - Exception thrownException, - Consumer consumer, - MessageListenerContainer container, - boolean batchListener) - { - rememberExceptionAndStopContainer(thrownException, container); - } - - @Override - public void handleRemaining( - Exception thrownException, - List> records, - Consumer consumer, - MessageListenerContainer container) - { - Map offsets = new HashMap<>(); - records.forEach(record -> - offsets.computeIfAbsent( - new TopicPartition(record.topic(), record.partition()), - offset -> record.offset())); - offsets.forEach((tp, offset) -> consumer.seek(tp, offset)); - rememberExceptionAndStopContainer(thrownException, container); - } - - @Override - public void handleBatch( - Exception thrownException, - ConsumerRecords data, - Consumer consumer, - MessageListenerContainer container, - Runnable invokeListener) - { - // Do not commit the polled offsets on a logic-error - ack = false; - rememberExceptionAndStopContainer(thrownException, container); - } - - private void rememberExceptionAndStopContainer( - Exception exception, - MessageListenerContainer container) - { - log.error("{}, stopping container {} abnormally", exception, container); - this.exception = exception; - container.stopAbnormally(() -> log.info("{} is stopped", container)); - } - - @Override - public boolean isAckAfterHandle() - { - return ack; - } - - - public Optional getException() - { - return Optional.ofNullable(exception); - } - - public void clearState() - { - this.exception = null; - this.ack = true; - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index ab9782c..e215c69 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -16,17 +16,8 @@ public class ApplicationHealthIndicator implements HealthIndicator @Override public Health health() { - try - { - return consumer - .exitStatus() - .map(Health::down) - .orElse(Health.outOfService()) - .build(); - } - catch (IllegalStateException e) - { - return Health.up().build(); - } + return consumer.running() + ? Health.up().build() + : Health.down().build(); } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 655151a..27c1e44 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -25,7 +25,6 @@ public class EndlessConsumer { private final String id; private final KafkaListenerEndpointRegistry registry; - private final ApplicationErrorHandler errorHandler; private final RecordHandler recordHandler; private long consumed = 0; @@ -83,7 +82,6 @@ public class EndlessConsumer throw new IllegalStateException("Consumer instance " + id + " is already running!"); log.info("{} - Starting - consumed {} messages before", id, consumed); - errorHandler.clearState(); registry.getListenerContainer(id).start(); } @@ -101,12 +99,4 @@ public class EndlessConsumer { return registry.getListenerContainer(id).isRunning(); } - - public Optional exitStatus() - { - if (running()) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return errorHandler.getException(); - } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 49ddb47..66a80ad 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -109,9 +109,9 @@ abstract class GenericApplicationTests assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should still be running") + .isTrue(); endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); @@ -144,12 +144,9 @@ abstract class GenericApplicationTests .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); + assertThat(endlessConsumer.running()) + .describedAs("Consumer should have exited") + .isFalse(); recordGenerator.assertBusinessLogic(); } @@ -177,12 +174,9 @@ abstract class GenericApplicationTests assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThatNoException() + assertThat(endlessConsumer.running()) .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); + .isFalse(); recordGenerator.assertBusinessLogic(); } @@ -392,10 +386,10 @@ abstract class GenericApplicationTests return new TestRecordHandler(applicationRecordHandler); } - @Bean(destroyMethod = "close") - public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } } } -- 2.20.1 From 7a7926c1799495a3ed016cb1b204cbfe13f833f1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 11 Sep 2022 13:42:57 +0200 Subject: [PATCH 15/16] =?utf8?q?Anzahl=20der=20Fehler=20f=C3=BCr=20die=20T?= =?utf8?q?est-Logik=20verf=C3=BCgbar=20gemacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 41 +++++++++++++++---- .../juplo/kafka/GenericApplicationTests.java | 21 ++++++---- 2 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index bd9f449..e01fdd1 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -41,17 +41,22 @@ public class ApplicationTests extends GenericApplicationTests final StringSerializer stringSerializer = new StringSerializer(); final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); - int counter = 0; + int counterMessages; + int counterPoisonPills; + int counterLogicErrors; Map> state; @Override - public int generate( + public void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender) { - counter = 0; + counterMessages = 0; + counterPoisonPills = 0; + counterLogicErrors = 0; + state = Arrays .stream(dieWilden13) @@ -76,8 +81,8 @@ public class ApplicationTests extends GenericApplicationTests key, calculateMessage, Message.Type.CALC, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), + poisonPill(poisonPills, pass, counterMessages), + logicError(logicErrors, pass, counterMessages), messageSender); state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); // Pick next number to calculate @@ -90,13 +95,29 @@ public class ApplicationTests extends GenericApplicationTests key, new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), Message.Type.ADD, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), + poisonPill(poisonPills, pass, counterMessages), + logicError(logicErrors, pass, counterMessages), messageSender); } } + } + + @Override + public int getNumberOfMessages() + { + return counterMessages; + } - return counter; + @Override + public int getNumberOfPoisonPills() + { + return counterPoisonPills; + } + + @Override + public int getNumberOfLogicErrors() + { + return counterLogicErrors; } boolean poisonPill (boolean poisonPills, int pass, int counter) @@ -117,15 +138,17 @@ public class ApplicationTests extends GenericApplicationTests boolean logicError, Consumer> messageSender) { - counter++; + counterMessages++; if (logicError) { value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); + counterLogicErrors++; } if (poisonPill) { value = new Bytes("BOOM!".getBytes()); + counterPoisonPills++; } ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 66a80ad..4793d96 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -92,8 +92,9 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws Exception { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); + recordGenerator.generate(false, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) @@ -121,8 +122,9 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); + recordGenerator.generate(true, false, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -155,8 +157,9 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(logicError = true) void commitsOffsetsOfUnseenRecordsOnLogicError() { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); + recordGenerator.generate(false, true, messageSender); + + int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -271,11 +274,15 @@ abstract class GenericApplicationTests public interface RecordGenerator { - int generate( + void generate( boolean poisonPills, boolean logicErrors, Consumer> messageSender); + int getNumberOfMessages(); + int getNumberOfPoisonPills(); + int getNumberOfLogicErrors(); + default boolean canGeneratePoisonPill() { return true; -- 2.20.1 From ac154bb18a6c575fe01e70cba6a86d10580dfb89 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 20:41:06 +0200 Subject: [PATCH 16/16] DLT auf Basis des `DeadLetterPublishingRecoverer` konfiguriert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der `DeadLetterPublishingRecoverer` muss explizit instanziiert werden. * Um ihm den Spring-Kafka-Beans bekannt zu machen, muss die `DefaultErrorHandler`-Bean überschrieben werden. * Der Recoverer wird dem Handler zusammen mit einer BackOff-Strategie übergeben. * Damit der `DeadLetterPublishingRecoverer` die weiterzuleitenden Nachrichten senden kann, muss * Der Producer benötigt scheinbar einen separaten Eintrag für `bootstrap-servers` unter `spring.kafka.producer`. Der Eintrag `spring.kafa.bootstrap-servers` wird hier nicht übernommen! --- docker-compose.yml | 5 ++ .../juplo/kafka/ApplicationConfiguration.java | 49 ++++++++++++++ src/main/resources/application.yml | 9 ++- .../juplo/kafka/GenericApplicationTests.java | 64 +++++++++---------- 4 files changed, 93 insertions(+), 34 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 16fec5b..960bbc2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -85,10 +85,13 @@ services: bash -c " kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out.DLT kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --create --topic out.DLT --partitions 2 --replication-factor 3 --config min.insync.replicas=2 kafka-topics --bootstrap-server kafka:9092 --describe --topic in kafka-topics --bootstrap-server kafka:9092 --describe --topic out + kafka-topics --bootstrap-server kafka:9092 --describe --topic out.DLT " cli: @@ -130,6 +133,7 @@ services: environment: server.port: 8080 spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.producer.bootstrap-servers: kafka:9092 spring.kafak.client-id: adder-1 spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms @@ -144,6 +148,7 @@ services: environment: server.port: 8080 spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.producer.bootstrap-servers: kafka:9092 spring.kafak.client-id: adder-2 spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c09eec3..b5f6187 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,13 +1,25 @@ package de.juplo.kafka; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Map; import java.util.Optional; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaOperations; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.DeadLetterPublishingRecoverer; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.backoff.FixedBackOff; @Configuration @@ -58,4 +70,41 @@ public class ApplicationConfiguration endpointRegistry, recordHandler); } + + @Bean + public ProducerFactory producerFactory( + KafkaProperties properties) + { + return new DefaultKafkaProducerFactory<>( + properties.getProducer().buildProperties(), + new StringSerializer(), + new DelegatingByTypeSerializer( + Map.of( + byte[].class, new ByteArraySerializer(), + MessageAddNumber.class, new JsonSerializer<>(), + MessageCalculateSum.class, new JsonSerializer<>()))); + } + + @Bean + public KafkaTemplate kafkaTemplate( + ProducerFactory producerFactory) + { + return new KafkaTemplate<>(producerFactory); + } + + @Bean + public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer( + KafkaOperations kafkaTemplate) + { + return new DeadLetterPublishingRecoverer(kafkaTemplate); + } + + @Bean + public DefaultErrorHandler errorHandler( + DeadLetterPublishingRecoverer recoverer) + { + return new DefaultErrorHandler( + recoverer, + new FixedBackOff(0l, 0l)); + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 92f3a6b..0bc592c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -34,10 +34,17 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 + spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer + spring.json.type.mapping: > + ADD:de.juplo.kafka.MessageAddNumber, + CALC:de.juplo.kafka.MessageCalculateSum + producer: + bootstrap-servers: :9092 + properties: spring.json.type.mapping: > ADD:de.juplo.kafka.MessageAddNumber, CALC:de.juplo.kafka.MessageCalculateSum diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 4793d96..b98066f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -42,6 +41,7 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @@ -124,32 +124,29 @@ abstract class GenericApplicationTests { recordGenerator.generate(true, false, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfPoisonPills(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedMessages) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should have exited") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -159,28 +156,29 @@ abstract class GenericApplicationTests { recordGenerator.generate(false, true, messageSender); - int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages(); + int numberOfValidMessages = + recordGenerator.getNumberOfMessages() - + recordGenerator.getNumberOfLogicErrors(); - await("Consumer failed") + await(numberOfValidMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .until(() -> recordHandler.receivedMessages >= numberOfValidMessages); - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) + await("Offsets committed") + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + .untilAsserted(() -> + { + checkSeenOffsetsForProgress(); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + }); assertThat(endlessConsumer.running()) - .describedAs("Consumer should not be running") - .isFalse(); + .describedAs("Consumer should still be running") + .isTrue(); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } -- 2.20.1