From a8e18a7ca9cb26ec03cd3541ddc7b85d93c2a20c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 29 Aug 2022 18:59:29 +0200 Subject: [PATCH 01/16] =?utf8?q?Vorf=C3=BChr-Skript=20=C3=BCberarbeitet:?= =?utf8?q?=20Vorgang=20durch=20andere=20Reihenfolge=20beschleunigt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch das beide Consumer 1x ordentlich gestoppt werden, wird sowohl für `peter` als auch für `klaus` mal die Resultate in der Mongo-DB gespeichert. * Da dies zuvor nur für einen der Nutzer geschehen ist, hat das Skript nach dem außerordentlichen Beenden eines Consumer sehr lange warten müssen, bis nach dem Neustart die Verarbeitung der angelaufenen Daten so weit fortgeschritten war, dass erste Resultate für beide Consumer sichtbar geworden sind. --- README.sh | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/README.sh b/README.sh index 3f74cd1..d22179d 100755 --- a/README.sh +++ b/README.sh @@ -74,20 +74,20 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq -docker-compose stop adder-2 -until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done +docker-compose stop adder-1 +until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results +echo "Resultate für adder-2" +http -v --pretty none -S :8092/results echo -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq +echo "Resultate für peter von adder-2" +http :8092/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-2" +http :8092/results/klaus | jq .[].sum | uniq -docker-compose kill -s 9 adder-1 +docker-compose kill -s 9 adder-2 docker-compose start adder-1 while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -- 2.20.1 From 9f8bcc8f869626efa0a258b3116582c8354a66d1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 31 Aug 2022 16:41:04 +0200 Subject: [PATCH 02/16] =?utf8?q?Klar=20erkennbar=20gemacht,=20dass=20Staff?= =?utf8?q?el=C3=BCbergabe=20nur=20im=20Regelfall=20klappt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 8 +++++++- docker-compose.yml | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/README.sh b/README.sh index d22179d..3292f5f 100755 --- a/README.sh +++ b/README.sh @@ -89,6 +89,7 @@ http :8092/results/klaus | jq .[].sum | uniq docker-compose kill -s 9 adder-2 docker-compose start adder-1 +docker-compose kill -s 9 peter klaus while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done @@ -102,4 +103,9 @@ http :8091/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-1" http :8091/results/klaus | jq .[].sum | uniq -docker-compose kill -s 9 peter klaus +sleep 5 + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq diff --git a/docker-compose.yml b/docker-compose.yml index c46b00d..5f0acac 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -131,7 +131,7 @@ services: server.port: 8080 sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-1 - sumup.adder.commit-interval: 3s + sumup.adder.commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo @@ -145,7 +145,7 @@ services: server.port: 8080 sumup.adder.bootstrap-server: kafka:9092 sumup.adder.client-id: adder-2 - sumup.adder.commit-interval: 3s + sumup.adder.commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo -- 2.20.1 From 9fe70ad96d2b5a9ed0581057da54facba859ad1f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 28 Aug 2022 16:01:22 +0200 Subject: [PATCH 03/16] =?utf8?q?R=C3=BCckbau=20auf=20einen=20Consumer,=20d?= =?utf8?q?er=20in=20`onPartitionsRevoked()`=20nicht=20committed?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Dadurch sollte es bei einem Rebalance i.d.R. zu Fehlern in dem mitgeführten Zustand kommen, da die Verarbeitung nur im Zufall an dem Offset fortegführt wird, für den der Zustand gespeichert wurde. * Um das vorherige Verhalten der Implementierung wiederherzustellen, müssen insbesondere die commits im Falle eines ordentlichen Herunterfahrens und eines Deserialisierungs-Fehlers wieder ergänzt werden. Denn ansonsten bestätigt die Implementierung die Offsets für die zuletzt erfolgreich verarbeiteten Nachrichten nicht. * Vorführ-Skript so angepasst, dass man sofort sieht, dass in dieser Version schon eine reguläre "Staffelübergabe" - also auch schon ein normales Rebalance, das einfach durch das Starten eines zweiten Consumers ausgelöst wurde - ein Fehler auftritt. --- README.sh | 34 ------------------- .../juplo/kafka/ApplicationConfiguration.java | 5 +-- .../kafka/ApplicationRebalanceListener.java | 12 ------- .../java/de/juplo/kafka/EndlessConsumer.java | 4 ++- 4 files changed, 4 insertions(+), 51 deletions(-) diff --git a/README.sh b/README.sh index 3292f5f..c9494b9 100755 --- a/README.sh +++ b/README.sh @@ -74,38 +74,4 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq -docker-compose stop adder-1 -until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done - -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose kill -s 9 adder-2 -docker-compose start adder-1 docker-compose kill -s 9 peter klaus -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done -until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done - -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -sleep 5 - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index e08cff4..1c69760 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; @@ -39,15 +38,13 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - Consumer consumer, ApplicationProperties properties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - properties.getClientId(), - consumer); + properties.getClientId()); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index eef0d00..0bfee67 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; @@ -17,7 +16,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener private final AdderResults adderResults; private final StateRepository stateRepository; private final String id; - private final Consumer consumer; private final Set partitions = new HashSet<>(); @@ -50,16 +48,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener @Override public void onPartitionsRevoked(Collection partitions) { - log.info("{} - Commiting offsets for all previously assigned partitions", id); - try - { - consumer.commitSync(); - } - catch (Exception e) - { - log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e); - } - partitions.forEach(tp -> { Integer partition = tp.partition(); diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index f0e74d3..00678c4 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -71,7 +71,8 @@ public class EndlessConsumer implements Runnable } catch(WakeupException e) { - log.info("{} - RIIING! Request to stop consumption.", id); + log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); + consumer.commitSync(); shutdown(); } catch(RecordDeserializationException e) @@ -85,6 +86,7 @@ public class EndlessConsumer implements Runnable offset, e.getCause().toString()); + consumer.commitSync(); shutdown(e); } catch(Exception e) -- 2.20.1 From 21d88d8ede95d1f811ff91d3804cba6d95ae6aab Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 30 Aug 2022 06:32:12 +0200 Subject: [PATCH 04/16] =?utf8?q?Wechsel=20auf=20den=20`StickyAssignor`=20l?= =?utf8?q?=C3=B6st=20die=20Rebalance-Fehler?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die durch Rebalances ausgelösten Zustand-Fehler bei regulären "Staffelübergaben" lassen sich vollständig durch ein Downgrade des `CooperativeStickyAssignor` auf den `StickyAssignor` lösen. * *Achtung:* Der `StickyAssignor` verwendet das Eager-Protokoll. * D.h., ein Down-Grade auf den `StickyAssignor` benötigt einen Reset der Gruppe, ist also nicht per Rolling Upgrade im laufenden Betrieb möglich. * Vorführ-Skript so angepasst, dass man sofort sieht, dass diese Version alle regulären Rebalance-Fälle ohne Fehler durchführen kann. --- README.sh | 13 +++++++++++++ .../de/juplo/kafka/ApplicationConfiguration.java | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/README.sh b/README.sh index c9494b9..f337d5c 100755 --- a/README.sh +++ b/README.sh @@ -74,4 +74,17 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq +docker-compose stop adder-1 +until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done + +echo "Resultate für adder-2" +http -v --pretty none -S :8092/results +echo + +echo "Resultate für peter von adder-2" +http :8092/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-2" +http :8092/results/klaus | jq .[].sum | uniq + docker-compose kill -s 9 peter klaus diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 1c69760..624a4ec 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -77,7 +77,7 @@ public class ApplicationConfiguration Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor"); props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("auto.offset.reset", properties.getAutoOffsetReset()); -- 2.20.1 From 0c89ddec8156e2f44fe28c9d05fe06f548e9168e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 15:55:35 +0200 Subject: [PATCH 05/16] `maven-failsafe-plugin` aktiviert und den `ApplicationIT` repariert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Das `maven-failsafe-plugin war nich scharf geschaltet, so dass nicht aufgefallen war, dass der Integration-Test `ApplicationIT` wegen fehlender Anpassungen gar nicht mehr lauffähig war. * Anpassungen an dem Integration-Test nachgeholt. --- pom.xml | 3 +++ src/test/java/de/juplo/kafka/ApplicationIT.java | 6 +++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index ecb559a..6699408 100644 --- a/pom.xml +++ b/pom.xml @@ -116,6 +116,9 @@ + + maven-failsafe-plugin + diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index cded0ee..dcac79b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -8,14 +8,14 @@ import org.springframework.boot.test.web.client.TestRestTemplate; import org.springframework.boot.test.web.server.LocalServerPort; import org.springframework.kafka.test.context.EmbeddedKafka; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.ApplicationIT.TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC, + "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.adder.topic=" + TOPIC, "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC) @AutoConfigureDataMongo -- 2.20.1 From caed9441a9303af071a572405ae4a665d60faae7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 3 Sep 2022 14:24:07 +0200 Subject: [PATCH 06/16] Der Adder verarbeitet zwei Typen von JSON-Nachrichten anstatt String MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Bisher waren alle Nachrichten vom Typ `String`. * Jetzt verarbeitet der Adder zwei unterschiedliche Typen von Nachrichten. * Die Nachrichten werden als JSON übertragen und mit Hilfe des `JsonDeserializer` von Spring Kafka in zwei unterschiedliche Spezialisierungen einer Basis-Klasse deserialisiert. * Die für die Deserialisierung benötigte Typen-Information wird von dem Spring-Kafka-Tooling über den die `__TypeId__` transportiert. * D.h., damit die Nachrichten korrekt deserialisiert werden können, ist es _nicht_ nötig, dass der Typ der Nachricht von Jackson aus der Nachricht selbst abgeleitet werden kann, sondern dass sich Sender und Empfänger darüber verständigen, welchen Hinweis sie in dem `__TypeId__`-Header hinterlegen. * Die Verarbeitung der zwei Nachrichten-Typen wurde in Unter-Methoden ausgelagert, da dies die Vergleichbarkeit des Codes zur der Variante mit `@KafkaHandler` erhöht. --- pom.xml | 4 +- .../juplo/kafka/ApplicationConfiguration.java | 13 +++-- .../kafka/ApplicationHealthIndicator.java | 2 +- .../juplo/kafka/ApplicationRecordHandler.java | 40 +++++++++++---- src/main/java/de/juplo/kafka/Message.java | 9 ++++ .../java/de/juplo/kafka/MessageAddNumber.java | 19 +++++++ .../de/juplo/kafka/MessageCalculateSum.java | 16 ++++++ .../java/de/juplo/kafka/ApplicationTests.java | 50 +++++++++++++------ src/test/java/de/juplo/kafka/MessageTest.java | 39 +++++++++++++++ 9 files changed, 158 insertions(+), 34 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Message.java create mode 100644 src/main/java/de/juplo/kafka/MessageAddNumber.java create mode 100644 src/main/java/de/juplo/kafka/MessageCalculateSum.java create mode 100644 src/test/java/de/juplo/kafka/MessageTest.java diff --git a/pom.xml b/pom.xml index 6699408..43a63c7 100644 --- a/pom.xml +++ b/pom.xml @@ -44,8 +44,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 624a4ec..156b5a0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Optional; import java.util.Properties; @@ -48,8 +49,8 @@ public class ApplicationConfiguration } @Bean - public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + public EndlessConsumer endlessConsumer( + KafkaConsumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, @@ -72,7 +73,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { Properties props = new Properties(); @@ -84,7 +85,11 @@ public class ApplicationConfiguration props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", JsonDeserializer.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); + props.put(JsonDeserializer.TYPE_MAPPINGS, + Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," + + Message.Type.CALC + ":" + MessageCalculateSum.class.getName()); return new KafkaConsumer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..03a14c8 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..2829157 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -12,7 +12,7 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; @@ -21,22 +21,40 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); + public void addNumber( + Integer partition, + String user, + MessageAddNumber message) + { + state.get(partition).addToSum(user, message.getNext()); + } + + public void calculateSum( + Integer partition, + String user, + MessageCalculateSum message) + { + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + } + @Override - public void accept(ConsumerRecord record) + public void accept(ConsumerRecord record) { Integer partition = record.partition(); String user = record.key(); - String message = record.value(); + Message message = record.value(); - if (message.equals("CALCULATE")) - { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - } - else + switch(message.getType()) { - state.get(partition).addToSum(user, Integer.parseInt(message)); + case ADD: + addNumber(partition, user, (MessageAddNumber) message); + break; + + case CALC: + calculateSum(partition, user, (MessageCalculateSum) message); + break; } if (throttle.isPresent()) diff --git a/src/main/java/de/juplo/kafka/Message.java b/src/main/java/de/juplo/kafka/Message.java new file mode 100644 index 0000000..e4999b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Message.java @@ -0,0 +1,9 @@ +package de.juplo.kafka; + + +public abstract class Message +{ + public enum Type {ADD, CALC} + + public abstract Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/MessageAddNumber.java b/src/main/java/de/juplo/kafka/MessageAddNumber.java new file mode 100644 index 0000000..c024b65 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageAddNumber.java @@ -0,0 +1,19 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageAddNumber extends Message +{ + private Integer next; + + + @Override + public Type getType() + { + return Type.ADD; + } +} diff --git a/src/main/java/de/juplo/kafka/MessageCalculateSum.java b/src/main/java/de/juplo/kafka/MessageCalculateSum.java new file mode 100644 index 0000000..afc5a39 --- /dev/null +++ b/src/main/java/de/juplo/kafka/MessageCalculateSum.java @@ -0,0 +1,16 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class MessageCalculateSum extends Message +{ + @Override + public Type getType() + { + return Type.CALC; + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6a037eb..bd9f449 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -15,7 +15,7 @@ import static org.assertj.core.api.Assertions.assertThat; @Slf4j -public class ApplicationTests extends GenericApplicationTests +public class ApplicationTests extends GenericApplicationTests { @Autowired StateRepository stateRepository; @@ -39,7 +39,7 @@ public class ApplicationTests extends GenericApplicationTests .mapToObj(i -> "seeräuber-" + i) .toArray(i -> new String[i]); final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "CALCULATE")); + final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); int counter = 0; @@ -72,7 +72,13 @@ public class ApplicationTests extends GenericApplicationTests if (message[i] > number[i]) { - send(key, calculateMessage, fail(logicErrors, pass, counter), messageSender); + send( + key, + calculateMessage, + Message.Type.CALC, + poisonPill(poisonPills, pass, counter), + logicError(logicErrors, pass, counter), + messageSender); state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); // Pick next number to calculate number[i] = numbers[next++%numbers.length]; @@ -80,15 +86,25 @@ public class ApplicationTests extends GenericApplicationTests log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); } - Bytes value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(message[i]++))); - send(key, value, fail(logicErrors, pass, counter), messageSender); + send( + key, + new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), + Message.Type.ADD, + poisonPill(poisonPills, pass, counter), + logicError(logicErrors, pass, counter), + messageSender); } } return counter; } - boolean fail (boolean logicErrors, int pass, int counter) + boolean poisonPill (boolean poisonPills, int pass, int counter) + { + return poisonPills && pass > 300 && counter%99 == 0; + } + + boolean logicError(boolean logicErrors, int pass, int counter) { return logicErrors && pass > 300 && counter%77 == 0; } @@ -96,23 +112,25 @@ public class ApplicationTests extends GenericApplicationTests void send( Bytes key, Bytes value, - boolean fail, + Message.Type type, + boolean poisonPill, + boolean logicError, Consumer> messageSender) { counter++; - if (fail) + if (logicError) { - value = new Bytes(stringSerializer.serialize(TOPIC, Integer.toString(-1))); + value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); + } + if (poisonPill) + { + value = new Bytes("BOOM!".getBytes()); } - messageSender.accept(new ProducerRecord<>(TOPIC, key, value)); - } - - @Override - public boolean canGeneratePoisonPill() - { - return false; + ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); + record.headers().add("__TypeId__", type.toString().getBytes()); + messageSender.accept(record); } @Override diff --git a/src/test/java/de/juplo/kafka/MessageTest.java b/src/test/java/de/juplo/kafka/MessageTest.java new file mode 100644 index 0000000..52794ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/MessageTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.*; + + +public class MessageTest +{ + ObjectMapper mapper = new ObjectMapper(); + + @Test + @DisplayName("Deserialize a MessageAddNumber message") + public void testDeserializeMessageAddNumber() + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"next\":42}", MessageAddNumber.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666,\"next\":42}", MessageAddNumber.class)); + } + + @Test + @DisplayName("Deserialize a MessageCalculateSum message") + public void testDeserializeMessageCalculateSum() throws JsonProcessingException + { + Assertions.assertDoesNotThrow(() -> mapper.readValue("{}", MessageCalculateSum.class)); + Assertions.assertDoesNotThrow(() -> mapper.readValue("{\"number\":666}", MessageCalculateSum.class)); + } +} -- 2.20.1 From c4cf0feba4bc73d2c8ec23c2a83da609c99a3f9d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 07:35:52 +0200 Subject: [PATCH 07/16] =?utf8?q?Separate=20Artefakt-/Image-ID=20f=C3=BCr?= =?utf8?q?=20die=20JSON-Version=20des=20requests-Services?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Version des Addierer-Services, die JSON-Nachrichten verarbeitet, hat jetzt eine explizite eigene Artefakt- und Image-ID. * Außerdem wurde das Compose-Setup so umgestellt, dass es explizit die JSON-Version des Requests- und des Adder-Services verwendet. --- README.sh | 2 +- docker-compose.yml | 8 ++++---- pom.xml | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/README.sh b/README.sh index f337d5c..22f52f0 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder:1.0-SNAPSHOT +IMAGE=juplo/sumup-adder-json:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker-compose.yml b/docker-compose.yml index 5f0acac..5d33cd1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -106,7 +106,7 @@ services: sumup.gateway.topic: in requests-1: - image: juplo/sumup-requests:1.0-SNAPSHOT + image: juplo/sumup-requests-json:1.0-SNAPSHOT ports: - 8081:8080 environment: @@ -115,7 +115,7 @@ services: sumup.requests.client-id: requests-1 requests-2: - image: juplo/sumup-requests:1.0-SNAPSHOT + image: juplo/sumup-requests-json:1.0-SNAPSHOT ports: - 8082:8080 environment: @@ -124,7 +124,7 @@ services: sumup.requests.client-id: requests-2 adder-1: - image: juplo/sumup-adder:1.0-SNAPSHOT + image: juplo/sumup-adder-json:1.0-SNAPSHOT ports: - 8091:8080 environment: @@ -138,7 +138,7 @@ services: logging.level.org.apache.kafka.clients.consumer: DEBUG adder-2: - image: juplo/sumup-adder:1.0-SNAPSHOT + image: juplo/sumup-adder-json:1.0-SNAPSHOT ports: - 8092:8080 environment: diff --git a/pom.xml b/pom.xml index 43a63c7..17d3cba 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - sumup-adder + sumup-adder-json 1.0-SNAPSHOT SumUp Adder - Calculates the sum for the send messages + Calculates the sum for the send messages. This version consumes JSON-messages. 11 -- 2.20.1 From f18a765cc650b81788f356a80f975926930600c5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Apr 2022 11:08:37 +0200 Subject: [PATCH 08/16] =?utf8?q?Springify:=20Konfiguration=20erfolgt=20?= =?utf8?q?=C3=BCber=20`KafkaProperties`?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` aus `springified-consumer--config' übernommen. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/main/java/de/juplo/kafka/ApplicationProperties.java ** src/main/resources/application.yml ** src/test/java/de/juplo/kafka/ApplicationTests.java * Anpassungen an `README.sh`, `docker-compose.yml` und `pom.xml` nachgeholt. --- README.sh | 2 +- docker-compose.yml | 22 ++++++------ pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 36 ++++++++++--------- .../de/juplo/kafka/ApplicationProperties.java | 14 -------- src/main/resources/application.yml | 22 +++++++----- .../java/de/juplo/kafka/ApplicationIT.java | 2 +- .../juplo/kafka/GenericApplicationTests.java | 19 +++++----- 8 files changed, 58 insertions(+), 61 deletions(-) diff --git a/README.sh b/README.sh index 22f52f0..07e36d7 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder-json:1.0-SNAPSHOT +IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker-compose.yml b/docker-compose.yml index 5d33cd1..16fec5b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -96,13 +96,13 @@ services: command: sleep infinity gateway: - image: juplo/sumup-gateway:1.0-SNAPSHOT + image: juplo/sumup-gateway--springified:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - sumup.gateway.bootstrap-server: kafka:9092 - sumup.gateway.client-id: gateway + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: gateway sumup.gateway.topic: in requests-1: @@ -124,28 +124,28 @@ services: sumup.requests.client-id: requests-2 adder-1: - image: juplo/sumup-adder-json:1.0-SNAPSHOT + image: juplo/sumup-adder-springified:1.0-SNAPSHOT ports: - 8091:8080 environment: server.port: 8080 - sumup.adder.bootstrap-server: kafka:9092 - sumup.adder.client-id: adder-1 - sumup.adder.commit-interval: 1s + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafak.client-id: adder-1 + spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo logging.level.org.apache.kafka.clients.consumer: DEBUG adder-2: - image: juplo/sumup-adder-json:1.0-SNAPSHOT + image: juplo/sumup-adder-springified:1.0-SNAPSHOT ports: - 8092:8080 environment: server.port: 8080 - sumup.adder.bootstrap-server: kafka:9092 - sumup.adder.client-id: adder-2 - sumup.adder.commit-interval: 1s + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafak.client-id: adder-2 + spring.kafka.auto-commit-interval: 1s sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo diff --git a/pom.xml b/pom.xml index 17d3cba..a252d1c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ de.juplo.kafka - sumup-adder-json + sumup-adder-springified 1.0-SNAPSHOT SumUp Adder Calculates the sum for the send messages. This version consumes JSON-messages. diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 156b5a0..523707f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -14,18 +15,19 @@ import java.util.concurrent.Executors; @Configuration -@EnableConfigurationProperties(ApplicationProperties.class) +@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class }) public class ApplicationConfiguration { @Bean public ApplicationRecordHandler recordHandler( AdderResults adderResults, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new ApplicationRecordHandler( adderResults, - Optional.ofNullable(properties.getThrottle()), - properties.getClientId()); + Optional.ofNullable(applicationProperties.getThrottle()), + kafkaProperties.getClientId()); } @Bean @@ -39,13 +41,14 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new ApplicationRebalanceListener( recordHandler, adderResults, stateRepository, - properties.getClientId()); + kafkaProperties.getClientId()); } @Bean @@ -54,13 +57,14 @@ public class ApplicationConfiguration ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, - ApplicationProperties properties) + KafkaProperties kafkaProperties, + ApplicationProperties applicationProperties) { return new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, rebalanceListener, recordHandler); @@ -73,17 +77,17 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + public KafkaConsumer kafkaConsumer(KafkaProperties kafkaProperties) { Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor"); - props.put("group.id", properties.getGroupId()); - props.put("client.id", properties.getClientId()); - props.put("auto.offset.reset", properties.getAutoOffsetReset()); - props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); - props.put("metadata.max.age.ms", "1000"); + props.put("group.id", kafkaProperties.getConsumer().getGroupId()); + props.put("client.id", kafkaProperties.getClientId()); + props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)kafkaProperties.getConsumer().getAutoCommitInterval().toMillis()); + props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms")); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", JsonDeserializer.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index f852c00..005460c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -16,22 +16,8 @@ import java.time.Duration; @Setter public class ApplicationProperties { - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String clientId; @NotNull @NotEmpty private String topic; - @NotNull - @NotEmpty - private String autoOffsetReset; - @NotNull - private Duration commitInterval; private Duration throttle; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 26948f5..a899340 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ sumup: adder: - bootstrap-server: :9092 - group-id: my-group - client-id: DEV topic: out - auto-offset-reset: earliest - commit-interval: 5s management: endpoint: shutdown: @@ -21,16 +16,25 @@ management: enabled: true info: kafka: - bootstrap-server: ${consumer.bootstrap-server} - client-id: ${consumer.client-id} - group-id: ${consumer.group-id} + bootstrap-server: ${spring.kafka.consumer.bootstrap-servers} + client-id: ${spring.kafka.consumer.client-id} + group-id: ${spring.kafka.consumer.group-id} topic: ${consumer.topic} - auto-offset-reset: ${consumer.auto-offset-reset} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} spring: data: mongodb: uri: mongodb://juplo:training@localhost:27017 database: juplo + kafka: + bootstrap-servers: :9092 + client-id: DEV + consumer: + group-id: my-group + auto-offset-reset: earliest + auto-commit-interval: 5s + properties: + metadata.max.age.ms: 1000 logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java index dcac79b..4bb4f5b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -14,7 +14,7 @@ import static de.juplo.kafka.ApplicationIT.TOPIC; @SpringBootTest( webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8849317..869b5d9 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -14,6 +14,7 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; @@ -40,9 +41,9 @@ import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=500ms", + "spring.kafka.consumer.auto-commit-interval=500ms", "spring.mongodb.embedded.version=4.4.13" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @EnableAutoConfiguration @@ -59,7 +60,9 @@ abstract class GenericApplicationTests @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; + ApplicationProperties applicationProperties; + @Autowired + KafkaProperties kafkaProperties; @Autowired ExecutorService executor; @Autowired @@ -330,16 +333,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", kafkaProperties.getConsumer().getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -373,8 +376,8 @@ abstract class GenericApplicationTests endlessConsumer = new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + kafkaProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, rebalanceListener, captureOffsetAndExecuteTestHandler); -- 2.20.1 From 7adff476ad862d30d668d75212d1ca1c7cf16b03 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Apr 2022 11:24:55 +0200 Subject: [PATCH 09/16] =?utf8?q?Springify:=20Der=20Kafka-`Consumer`=20wird?= =?utf8?q?=20=C3=BCber=20die=20Spring-Factory=20erzeugt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` aus `springified-consumer--config' übernommen. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/test/java/de/juplo/kafka/ApplicationTests.java * Damit Spring Kafka den Consumer instanziieren kann, musste insbesondere noch der Teil der Konfiguration, der fix ist, aus der Konfig-Klasse `ApplicationConfiguration` in die YAML-Datei `application.yml` verschoben werden: ** Die Auswahl des `StickyAssignor` als Partition-Assignment-Strategy ** Die Konfiguration der Deserialisierer --- .../juplo/kafka/ApplicationConfiguration.java | 29 ++++--------------- src/main/resources/application.yml | 6 ++++ .../juplo/kafka/GenericApplicationTests.java | 13 ++++++--- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 523707f..bae5d51 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,15 +1,14 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; import java.util.Optional; -import java.util.Properties; +import org.springframework.kafka.core.ConsumerFactory; + import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -53,7 +52,7 @@ public class ApplicationConfiguration @Bean public EndlessConsumer endlessConsumer( - KafkaConsumer kafkaConsumer, + Consumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, @@ -77,24 +76,8 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaConsumer kafkaConsumer(KafkaProperties kafkaProperties) + public Consumer kafkaConsumer(ConsumerFactory factory) { - Properties props = new Properties(); - - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.StickyAssignor"); - props.put("group.id", kafkaProperties.getConsumer().getGroupId()); - props.put("client.id", kafkaProperties.getClientId()); - props.put("auto.offset.reset", kafkaProperties.getConsumer().getAutoOffsetReset()); - props.put("auto.commit.interval.ms", (int)kafkaProperties.getConsumer().getAutoCommitInterval().toMillis()); - props.put("metadata.max.age.ms", kafkaProperties.getConsumer().getProperties().get("metadata.max.age.ms")); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put(JsonDeserializer.TRUSTED_PACKAGES, "de.juplo.kafka"); - props.put(JsonDeserializer.TYPE_MAPPINGS, - Message.Type.ADD + ":" + MessageAddNumber.class.getName() + "," + - Message.Type.CALC + ":" + MessageCalculateSum.class.getName()); - - return new KafkaConsumer<>(props); + return factory.createConsumer(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a899340..92f3a6b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -33,8 +33,14 @@ spring: group-id: my-group auto-offset-reset: earliest auto-commit-interval: 5s + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: + partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 + spring.json.type.mapping: > + ADD:de.juplo.kafka.MessageAddNumber, + CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 869b5d9..21c3f7f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -14,6 +14,7 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; @@ -38,7 +39,11 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) +@SpringJUnitConfig( + initializers = ConfigDataApplicationContextInitializer.class, + classes = { + KafkaAutoConfiguration.class, + ApplicationTests.Configuration.class }) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -56,13 +61,13 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired ApplicationProperties applicationProperties; - @Autowired - KafkaProperties kafkaProperties; + @Autowired + KafkaProperties kafkaProperties; @Autowired ExecutorService executor; @Autowired -- 2.20.1 From 2eb3c45c9438a20777b0110defa593dd45c64511 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Sep 2022 12:44:54 +0200 Subject: [PATCH 10/16] Der Test verwendet die `@Bean` von `EndlessConsumer` MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Per `git cherry-pick` vom Branch `deserialization` gepflückt. * Conflicts: ** src/main/java/de/juplo/kafka/ApplicationConfiguration.java ** src/test/java/de/juplo/kafka/ApplicationTests.java ** src/test/java/de/juplo/kafka/GenericApplicationTests.java --- .../juplo/kafka/ApplicationConfiguration.java | 7 +- .../juplo/kafka/GenericApplicationTests.java | 75 +++++++------------ .../de/juplo/kafka/TestRecordHandler.java | 16 +++- 3 files changed, 46 insertions(+), 52 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index bae5d51..08c827c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -18,7 +18,7 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler recordHandler( + public ApplicationRecordHandler applicationRecordHandler( AdderResults adderResults, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) @@ -40,8 +40,7 @@ public class ApplicationConfiguration ApplicationRecordHandler recordHandler, AdderResults adderResults, StateRepository stateRepository, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaProperties kafkaProperties) { return new ApplicationRebalanceListener( recordHandler, @@ -55,7 +54,7 @@ public class ApplicationConfiguration Consumer kafkaConsumer, ExecutorService executor, ApplicationRebalanceListener rebalanceListener, - ApplicationRecordHandler recordHandler, + RecordHandler recordHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 21c3f7f..937b40f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,8 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -20,6 +18,7 @@ import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -27,7 +26,6 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -63,28 +61,21 @@ abstract class GenericApplicationTests @Autowired org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired - Consumer> consumer; - @Autowired - ApplicationProperties applicationProperties; - @Autowired KafkaProperties kafkaProperties; @Autowired - ExecutorService executor; + ApplicationProperties applicationProperties; @Autowired MongoClient mongoClient; @Autowired MongoProperties mongoProperties; @Autowired - ConsumerRebalanceListener rebalanceListener; + TestRecordHandler recordHandler; @Autowired - RecordHandler recordHandler; + EndlessConsumer endlessConsumer; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; final RecordGenerator recordGenerator; @@ -108,7 +99,7 @@ abstract class GenericApplicationTests await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); + .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -116,7 +107,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -140,7 +131,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -149,8 +140,8 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) + assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); + assertThat(recordHandler.receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -177,7 +168,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -185,7 +176,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -238,7 +229,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; + Long newOffset = recordHandler.seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -356,43 +347,30 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); + recordHandler.seenOffsets = new HashMap<>(); + recordHandler.receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); + recordHandler.seenOffsets.put(tp, offset - 1); }); - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - endlessConsumer.start(); } @AfterEach public void deinit() { + try + { + endlessConsumer.stop(); + } + catch (Exception e) + { + log.debug("{}", e.toString()); + } + try { testRecordProducer.close(); @@ -409,5 +387,10 @@ abstract class GenericApplicationTests @Import(ApplicationConfiguration.class) public static class Configuration { + @Bean + public RecordHandler recordHandler(RecordHandler applicationRecordHandler) + { + return new TestRecordHandler(applicationRecordHandler); + } } } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java index b4efdd6..37d3f65 100644 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -2,16 +2,28 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; +import java.util.Set; @RequiredArgsConstructor -public abstract class TestRecordHandler implements RecordHandler +public class TestRecordHandler implements RecordHandler { private final RecordHandler handler; + Map seenOffsets; + Set> receivedRecords; - public abstract void onNewRecord(ConsumerRecord record); + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } @Override public void accept(ConsumerRecord record) -- 2.20.1 From 377840107151d9c270f7e3a91a118dce4aa1295f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Nov 2022 19:44:39 +0100 Subject: [PATCH 11/16] WIP --- .../de/juplo/kafka/AdderBusinessLogic.java | 55 --- src/main/java/de/juplo/kafka/AdderResult.java | 21 - .../java/de/juplo/kafka/AdderResults.java | 47 --- src/main/java/de/juplo/kafka/Application.java | 51 +-- .../juplo/kafka/ApplicationConfiguration.java | 43 +- .../kafka/ApplicationHealthIndicator.java | 32 -- .../de/juplo/kafka/ApplicationProperties.java | 2 +- .../kafka/ApplicationRebalanceListener.java | 70 ---- .../juplo/kafka/ApplicationRecordHandler.java | 93 ---- .../java/de/juplo/kafka/DriverController.java | 89 ---- .../java/de/juplo/kafka/EndlessConsumer.java | 4 +- .../java/de/juplo/kafka/ErrorResponse.java | 11 - .../java/de/juplo/kafka/RecordHandler.java | 10 - .../java/de/juplo/kafka/StateDocument.java | 41 -- .../java/de/juplo/kafka/StateRepository.java | 11 - src/main/resources/application.yml | 10 +- .../juplo/kafka/AdderBusinessLogicTest.java | 117 ------ .../java/de/juplo/kafka/ApplicationTests.java | 172 -------- .../ErrorCannotBeGeneratedCondition.java | 60 --- .../juplo/kafka/GenericApplicationTests.java | 396 ------------------ .../kafka/SkipWhenErrorCannotBeGenerated.java | 15 - .../de/juplo/kafka/TestRecordHandler.java | 34 -- 22 files changed, 23 insertions(+), 1361 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/AdderBusinessLogic.java delete mode 100644 src/main/java/de/juplo/kafka/AdderResult.java delete mode 100644 src/main/java/de/juplo/kafka/AdderResults.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationRecordHandler.java delete mode 100644 src/main/java/de/juplo/kafka/DriverController.java delete mode 100644 src/main/java/de/juplo/kafka/ErrorResponse.java delete mode 100644 src/main/java/de/juplo/kafka/RecordHandler.java delete mode 100644 src/main/java/de/juplo/kafka/StateDocument.java delete mode 100644 src/main/java/de/juplo/kafka/StateRepository.java delete mode 100644 src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java delete mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java delete mode 100644 src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java delete mode 100644 src/test/java/de/juplo/kafka/GenericApplicationTests.java delete mode 100644 src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java delete mode 100644 src/test/java/de/juplo/kafka/TestRecordHandler.java diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java deleted file mode 100644 index d525182..0000000 --- a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka; - - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -public class AdderBusinessLogic -{ - private final Map state; - - - public AdderBusinessLogic() - { - this(new HashMap<>()); - } - - public AdderBusinessLogic(Map state) - { - this.state = state; - } - - - public synchronized Optional getSum(String user) - { - return Optional.ofNullable(state.get(user)).map(result -> result.sum); - } - - public synchronized void addToSum(String user, Integer value) - { - if (value == null || value < 1) - throw new IllegalArgumentException("Not a positive number: " + value); - - long sum = - Optional - .ofNullable(state.get(user)) - .map(result -> result.sum) - .orElse(0l); - state.put(user, new AdderResult(value, sum + value)); - } - - public synchronized AdderResult calculate(String user) - { - if (!state.containsKey(user)) - throw new IllegalStateException("No sumation for " + user + " in progress"); - - return state.remove(user); - } - - protected Map getState() - { - return state; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java deleted file mode 100644 index 44b7da8..0000000 --- a/src/main/java/de/juplo/kafka/AdderResult.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.RequiredArgsConstructor; - - -@RequiredArgsConstructor -@Getter -@EqualsAndHashCode -public class AdderResult -{ - final int number; - final long sum; - - @Override - public String toString() - { - return "sum(" + number + ") = " + sum; - } -} diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java deleted file mode 100644 index e7f5602..0000000 --- a/src/main/java/de/juplo/kafka/AdderResults.java +++ /dev/null @@ -1,47 +0,0 @@ -package de.juplo.kafka; - -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - - -public class AdderResults -{ - private final Map>> results = new HashMap<>(); - - - public void addResults(Integer partition, String user, AdderResult result) - { - Map> resultsByUser = this.results.get(partition); - - List results = resultsByUser.get(user); - if (results == null) - { - results = new LinkedList<>(); - resultsByUser.put(user, results); - } - - results.add(result); - } - - protected void addPartition(Integer partition, Map> results) - { - this.results.put(partition, results); - } - - protected Map> removePartition(Integer partition) - { - return this.results.remove(partition); - } - - public Map>> getState() - { - return results; - } - - public Map> getState(Integer partition) - { - return results.get(partition); - } -} diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..b4a960d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,11 +1,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.ConsumerFactory; import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; @@ -17,57 +20,29 @@ import java.util.concurrent.TimeUnit; public class Application implements ApplicationRunner { @Autowired - EndlessConsumer endlessConsumer; + Consumer consumer; @Autowired - ExecutorService executor; + SimpleConsumer simpleConsumer; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - endlessConsumer.start(); + simpleConsumer.start(); } @PreDestroy public void shutdown() { - try - { - log.info("Stopping EndlessConsumer"); - endlessConsumer.stop(); - } - catch (IllegalStateException e) - { - log.info("Was already stopped: {}", e.toString()); - } - catch (Exception e) - { - log.error("Unexpected exception while stopping EndlessConsumer: {}", e); - } + log.info("Signaling the consumer to quit its work"); + consumer.wakeup(); + } - try - { - log.info("Shutting down the ExecutorService."); - executor.shutdown(); - log.info("Waiting 5 seconds for the ExecutorService to terminate..."); - executor.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e); - } - finally - { - if (!executor.isTerminated()) - { - log.warn("Forcing shutdown of ExecutorService!"); - executor - .shutdownNow() - .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName())); - } - log.info("Shutdow of ExecutorService finished"); - } + @Bean(destroyMethod = "close") + public Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c827c..23e9bec 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -6,7 +6,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Optional; import org.springframework.kafka.core.ConsumerFactory; import java.util.concurrent.ExecutorService; @@ -18,54 +17,18 @@ import java.util.concurrent.Executors; public class ApplicationConfiguration { @Bean - public ApplicationRecordHandler applicationRecordHandler( - AdderResults adderResults, - KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) - { - return new ApplicationRecordHandler( - adderResults, - Optional.ofNullable(applicationProperties.getThrottle()), - kafkaProperties.getClientId()); - } - - @Bean - public AdderResults adderResults() - { - return new AdderResults(); - } - - @Bean - public ApplicationRebalanceListener rebalanceListener( - ApplicationRecordHandler recordHandler, - AdderResults adderResults, - StateRepository stateRepository, - KafkaProperties kafkaProperties) - { - return new ApplicationRebalanceListener( - recordHandler, - adderResults, - stateRepository, - kafkaProperties.getClientId()); - } - - @Bean - public EndlessConsumer endlessConsumer( + public SimpleConsumer endlessConsumer( Consumer kafkaConsumer, ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, - RecordHandler recordHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return - new EndlessConsumer<>( + new SimpleConsumer( executor, kafkaProperties.getClientId(), applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, - recordHandler); + kafkaConsumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java deleted file mode 100644 index 03a14c8..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ /dev/null @@ -1,32 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.boot.actuate.health.Health; -import org.springframework.boot.actuate.health.HealthIndicator; -import org.springframework.stereotype.Component; - - -@Component -@RequiredArgsConstructor -public class ApplicationHealthIndicator implements HealthIndicator -{ - private final EndlessConsumer consumer; - - - @Override - public Health health() - { - try - { - return consumer - .exitStatus() - .map(Health::down) - .orElse(Health.outOfService()) - .build(); - } - catch (IllegalStateException e) - { - return Health.up().build(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 005460c..d46a8b3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull; import java.time.Duration; -@ConfigurationProperties(prefix = "sumup.adder") +@ConfigurationProperties(prefix = "simple.consumer") @Validated @Getter @Setter diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java deleted file mode 100644 index 0bfee67..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ /dev/null @@ -1,70 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener -{ - private final ApplicationRecordHandler recordHandler; - private final AdderResults adderResults; - private final StateRepository stateRepository; - private final String id; - - private final Set partitions = new HashSet<>(); - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - this.partitions.add(partition); - StateDocument document = - stateRepository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); - for (String user : document.state.keySet()) - { - log.info( - "{} - Restored state for partition={}|user={}: {}", - id, - partition, - user, - document.state.get(user)); - } - adderResults.addPartition(partition, document.results); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String user : state.keySet()) - { - log.info( - "{} - Saved state for partition={}|user={}: {}", - id, - partition, - user, - state.get(user)); - } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); - }); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java deleted file mode 100644 index 2829157..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ /dev/null @@ -1,93 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRecordHandler implements RecordHandler -{ - private final AdderResults results; - private final Optional throttle; - private final String id; - - private final Map state = new HashMap<>(); - - - public void addNumber( - Integer partition, - String user, - MessageAddNumber message) - { - state.get(partition).addToSum(user, message.getNext()); - } - - public void calculateSum( - Integer partition, - String user, - MessageCalculateSum message) - { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - } - - @Override - public void accept(ConsumerRecord record) - { - Integer partition = record.partition(); - String user = record.key(); - Message message = record.value(); - - switch(message.getType()) - { - case ADD: - addNumber(partition, user, (MessageAddNumber) message); - break; - - case CALC: - calculateSum(partition, user, (MessageCalculateSum) message); - break; - } - - if (throttle.isPresent()) - { - try - { - Thread.sleep(throttle.get().toMillis()); - } - catch (InterruptedException e) - { - log.warn("{} - Intrerrupted while throttling: {}", id, e); - } - } - } - - protected void addPartition(Integer partition, Map state) - { - this.state.put(partition, new AdderBusinessLogic(state)); - } - - protected Map removePartition(Integer partition) - { - return this.state.remove(partition).getState(); - } - - - public Map getState() - { - return state; - } - - public AdderBusinessLogic getState(Integer partition) - { - return state.get(partition); - } -} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java deleted file mode 100644 index 26a5bc8..0000000 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ /dev/null @@ -1,89 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.*; - -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ExecutionException; -import java.util.stream.Collectors; - - -@RestController -@RequiredArgsConstructor -public class DriverController -{ - private final EndlessConsumer consumer; - private final ApplicationRecordHandler recordHandler; - private final AdderResults results; - - - @PostMapping("start") - public void start() - { - consumer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - consumer.stop(); - } - - - @GetMapping("state") - public Map> state() - { - return - recordHandler - .getState() - .entrySet() - .stream() - .collect(Collectors.toMap( - entry -> entry.getKey(), - entry -> entry.getValue().getState())); - } - - @GetMapping("state/{user}") - public ResponseEntity state(@PathVariable String user) - { - for (AdderBusinessLogic adder : recordHandler.getState().values()) - { - Optional sum = adder.getSum(user); - if (sum.isPresent()) - return ResponseEntity.ok(sum.get()); - } - - return ResponseEntity.notFound().build(); - } - - @GetMapping("results") - public Map>> results() - { - return results.getState(); - } - - @GetMapping("results/{user}") - public ResponseEntity> results(@PathVariable String user) - { - for (Map> resultsByUser : this.results.getState().values()) - { - List results = resultsByUser.get(user); - if (results != null) - return ResponseEntity.ok(results); - } - - return ResponseEntity.notFound().build(); - } - - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..ba8eb27 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.time.Duration; @@ -17,9 +18,10 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +@Component @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +public class EndlessConsumer implements Runnable { private final ExecutorService executor; private final String id; diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java deleted file mode 100644 index 5ca206d..0000000 --- a/src/main/java/de/juplo/kafka/ErrorResponse.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import lombok.Value; - - -@Value -public class ErrorResponse -{ - private final String error; - private final Integer status; -} diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java deleted file mode 100644 index 327ac9f..0000000 --- a/src/main/java/de/juplo/kafka/RecordHandler.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -import java.util.function.Consumer; - - -public interface RecordHandler extends Consumer> -{ -} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java deleted file mode 100644 index ae8eb51..0000000 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ /dev/null @@ -1,41 +0,0 @@ -package de.juplo.kafka; - -import lombok.ToString; -import org.springframework.data.annotation.Id; -import org.springframework.data.mongodb.core.mapping.Document; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -@Document(collection = "state") -@ToString -public class StateDocument -{ - @Id - public String id; - public Map state; - public Map> results; - - public StateDocument() - { - } - - public StateDocument(Integer partition) - { - this.id = Integer.toString(partition); - this.state = new HashMap<>(); - this.results = new HashMap<>(); - } - - public StateDocument( - Integer partition, - Map state, - Map> results) - { - this.id = Integer.toString(partition); - this.state = state; - this.results = results; - } -} diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java deleted file mode 100644 index 3129535..0000000 --- a/src/main/java/de/juplo/kafka/StateRepository.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.data.mongodb.repository.MongoRepository; - -import java.util.Optional; - - -public interface StateRepository extends MongoRepository -{ - public Optional findById(String partition); -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 92f3a6b..c2cb792 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,6 +1,6 @@ -sumup: - adder: - topic: out +simple: + consumer: + topic: test management: endpoint: shutdown: @@ -22,10 +22,6 @@ info: topic: ${consumer.topic} auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} spring: - data: - mongodb: - uri: mongodb://juplo:training@localhost:27017 - database: juplo kafka: bootstrap-servers: :9092 client-id: DEV diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java deleted file mode 100644 index 8e49263..0000000 --- a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java +++ /dev/null @@ -1,117 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.Arrays; -import java.util.stream.IntStream; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.*; - - -public class AdderBusinessLogicTest -{ - @Test - @DisplayName("An empty Optional should be returned, for a non-existing sum") - public void testGetSumReturnsEmptyOptionalForNonExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("A non-empty Optional should be returned, for an existing sum") - public void testGetSumReturnsNonEmptyOptionalForExistingSum() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThat(adder.getSum("foo")).isNotEmpty(); - } - - @Test - @DisplayName("A sum can be calculated, if it does exist") - public void testCalculatePossibleIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThatNoException().isThrownBy(() -> adder.calculate("foo")); - } - - @Test - @DisplayName("An existing sum is removed, if ended") - public void testCalculateRemovesSumIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - adder.calculate("foo"); - assertThat(adder.getSum("foo")).isEmpty(); - } - - @Test - @DisplayName("An existing sum returns a non-null value, if calculated") - public void testCalculateReturnsNonNullValueIfSumExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - adder.addToSum("foo", 6); - assertThat(adder.calculate("foo")).isNotNull(); - } - - @Test - @DisplayName("Ending a non-existing sum, causes an IllegalStateException") - public void testCalculateCausesExceptionIfNotExists() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo")); - } - - @Test - @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException") - public void testAddToSumWithNullValueCausesException() - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException") - @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE }) - public void testAddToSumWithNonPositiveValueCausesException(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @ParameterizedTest(name = "{index}: Adding {0}") - @DisplayName("Can add a positive value to a sum") - @ValueSource(ints = { 1, 3, 6, 66, 7, 9 }) - public void testAddToSumWithPositiveValuePossible(int value) - { - AdderBusinessLogic adder = new AdderBusinessLogic(); - assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value)); - } - - @ParameterizedTest(name = "{index}: Summing up {0}") - @DisplayName("Adds up numbers correctly") - @MethodSource("numbersProvider") - public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers) - { - long expectedResult = Arrays.stream(numbers).sum(); - AdderBusinessLogic adder = new AdderBusinessLogic(); - Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number)); - AdderResult result = adder.calculate("foo"); - assertThat(result.number).isEqualTo(numbers[numbers.length-1]); - assertThat(result.sum).isEqualTo(expectedResult); - } - - static Stream numbersProvider() { - return Stream.of( - Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()), - Arguments.of((Object) IntStream.rangeClosed(1,66).toArray())); - } -} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index bd9f449..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,172 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.*; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class ApplicationTests extends GenericApplicationTests -{ - @Autowired - StateRepository stateRepository; - - - public ApplicationTests() - { - super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; - } - - - static class ApplicationTestRecrodGenerator implements RecordGenerator - { - ApplicationTests tests; - - final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; - final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); - final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); - - int counter = 0; - - Map> state; - - @Override - public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) - { - counter = 0; - state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int next = 0; - - for (int pass = 0; pass < 333; pass++) - { - for (int i = 0; i<13; i++) - { - String seeräuber = dieWilden13[i]; - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - if (message[i] > number[i]) - { - send( - key, - calculateMessage, - Message.Type.CALC, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), - messageSender); - state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); - // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; - message[i] = 1; - log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); - } - - send( - key, - new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), - Message.Type.ADD, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), - messageSender); - } - } - - return counter; - } - - boolean poisonPill (boolean poisonPills, int pass, int counter) - { - return poisonPills && pass > 300 && counter%99 == 0; - } - - boolean logicError(boolean logicErrors, int pass, int counter) - { - return logicErrors && pass > 300 && counter%77 == 0; - } - - void send( - Bytes key, - Bytes value, - Message.Type type, - boolean poisonPill, - boolean logicError, - Consumer> messageSender) - { - counter++; - - if (logicError) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); - } - if (poisonPill) - { - value = new Bytes("BOOM!".getBytes()); - } - - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); - record.headers().add("__TypeId__", type.toString().getBytes()); - messageSender.accept(record); - } - - @Override - public void assertBusinessLogic() - { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); - - for (int j=0; j < resultsForUser.size(); j++) - { - if (!(j < state.get(user).size())) - { - break; - } - - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); - } - - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); - } - } - } -} diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java deleted file mode 100644 index 606218f..0000000 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ /dev/null @@ -1,60 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.platform.commons.util.AnnotationUtils; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - - -public class ErrorCannotBeGeneratedCondition implements ExecutionCondition -{ - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) - { - final Optional optional = - AnnotationUtils.findAnnotation( - context.getElement(), - SkipWhenErrorCannotBeGenerated.class); - - if (context.getTestInstance().isEmpty()) - return ConditionEvaluationResult.enabled("Test-instance ist not available"); - - if (optional.isPresent()) - { - SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); - GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); - List missingRequiredErrors = new LinkedList<>(); - - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) - missingRequiredErrors.add("Poison-Pill"); - - if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) - missingRequiredErrors.add("Logic-Error"); - - StringBuilder builder = new StringBuilder(); - builder.append(context.getTestClass().get().getSimpleName()); - - if (missingRequiredErrors.isEmpty()) - { - builder.append(" can generate all required types of errors"); - return ConditionEvaluationResult.enabled(builder.toString()); - } - - builder.append(" cannot generate the required error(s): "); - builder.append( - missingRequiredErrors - .stream() - .collect(Collectors.joining(", "))); - - return ConditionEvaluationResult.disabled(builder.toString()); - } - - return ConditionEvaluationResult.enabled( - "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName()); - } -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java deleted file mode 100644 index 937b40f..0000000 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ /dev/null @@ -1,396 +0,0 @@ -package de.juplo.kafka; - -import com.mongodb.client.MongoClient; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.boot.autoconfigure.mongo.MongoProperties; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; - -import java.time.Duration; -import java.util.*; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static de.juplo.kafka.GenericApplicationTests.PARTITIONS; -import static de.juplo.kafka.GenericApplicationTests.TOPIC; -import static org.assertj.core.api.Assertions.*; -import static org.awaitility.Awaitility.*; - - -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) -@TestPropertySource( - properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "spring.kafka.consumer.auto-commit-interval=500ms", - "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@EnableAutoConfiguration -@AutoConfigureDataMongo -@Slf4j -abstract class GenericApplicationTests -{ - public static final String TOPIC = "FOO"; - public static final int PARTITIONS = 10; - - - @Autowired - org.apache.kafka.clients.consumer.Consumer kafkaConsumer; - @Autowired - KafkaProperties kafkaProperties; - @Autowired - ApplicationProperties applicationProperties; - @Autowired - MongoClient mongoClient; - @Autowired - MongoProperties mongoProperties; - @Autowired - TestRecordHandler recordHandler; - @Autowired - EndlessConsumer endlessConsumer; - - KafkaProducer testRecordProducer; - KafkaConsumer offsetConsumer; - Map oldOffsets; - - - final RecordGenerator recordGenerator; - final Consumer> messageSender; - - public GenericApplicationTests(RecordGenerator recordGenerator) - { - this.recordGenerator = recordGenerator; - this.messageSender = (record) -> sendMessage(record); - } - - - /** Tests methods */ - - @Test - void commitsCurrentOffsetsOnSuccess() throws Exception - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); - - await(numberOfGeneratedMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - }); - - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(poisonPill = true) - void commitsOffsetOfErrorForReprocessingOnDeserializationError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - assertThat(recordHandler.receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); - - recordGenerator.assertBusinessLogic(); - } - - - /** Helper methods for the verification of expectations */ - - void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) - { - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") - .isEqualTo(expected); - }); - } - - void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) - { - List isOffsetBehindSeen = new LinkedList<>(); - - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset must be at most equal to the offset of the consumer") - .isLessThanOrEqualTo(expected); - isOffsetBehindSeen.add(offset < expected); - }); - - assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) - .describedAs("Committed offsets are behind seen offsets") - .isTrue(); - } - - void checkSeenOffsetsForProgress() - { - // Be sure, that some messages were consumed...! - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = recordHandler.seenOffsets.get(tp) + 1; - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress) - .describedAs("Some offsets must have changed, compared to the old offset-positions") - .isNotEmpty(); - } - - - /** Helper methods for setting up and running the tests */ - - void seekToEnd() - { - offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); - partitions().forEach(tp -> - { - // seekToEnd() works lazily: it only takes effect on poll()/position() - Long offset = offsetConsumer.position(tp); - log.info("New position for {}: {}", tp, offset); - }); - // The new positions must be commited! - offsetConsumer.commitSync(); - offsetConsumer.unsubscribe(); - } - - void doForCurrentOffsets(BiConsumer consumer) - { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } - - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); - } - - - public interface RecordGenerator - { - int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender); - - default boolean canGeneratePoisonPill() - { - return true; - } - - default boolean canGenerateLogicError() - { - return true; - } - - default void assertBusinessLogic() - { - log.debug("No business-logic to assert"); - } - } - - void sendMessage(ProducerRecord record) - { - testRecordProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); - } - - - @BeforeEach - public void init() - { - Properties props; - props = new Properties(); - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("linger.ms", 100); - props.put("key.serializer", BytesSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - testRecordProducer = new KafkaProducer<>(props); - - props = new Properties(); - props.put("bootstrap.servers", kafkaProperties.getBootstrapServers()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", kafkaProperties.getConsumer().getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - offsetConsumer = new KafkaConsumer<>(props); - - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - seekToEnd(); - - oldOffsets = new HashMap<>(); - recordHandler.seenOffsets = new HashMap<>(); - recordHandler.receivedRecords = new HashSet<>(); - - doForCurrentOffsets((tp, offset) -> - { - oldOffsets.put(tp, offset - 1); - recordHandler.seenOffsets.put(tp, offset - 1); - }); - - endlessConsumer.start(); - } - - @AfterEach - public void deinit() - { - try - { - endlessConsumer.stop(); - } - catch (Exception e) - { - log.debug("{}", e.toString()); - } - - try - { - testRecordProducer.close(); - offsetConsumer.close(); - } - catch (Exception e) - { - log.info("Exception while stopping the consumer: {}", e.toString()); - } - } - - - @TestConfiguration - @Import(ApplicationConfiguration.class) - public static class Configuration - { - @Bean - public RecordHandler recordHandler(RecordHandler applicationRecordHandler) - { - return new TestRecordHandler(applicationRecordHandler); - } - } -} diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java deleted file mode 100644 index 6d15e9e..0000000 --- a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ExtendWith; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - - -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(ErrorCannotBeGeneratedCondition.class) -public @interface SkipWhenErrorCannotBeGenerated -{ - boolean poisonPill() default false; - boolean logicError() default false; -} diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java deleted file mode 100644 index 37d3f65..0000000 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ /dev/null @@ -1,34 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.TopicPartition; - -import java.util.Map; -import java.util.Set; - - -@RequiredArgsConstructor -public class TestRecordHandler implements RecordHandler -{ - private final RecordHandler handler; - - Map seenOffsets; - Set> receivedRecords; - - - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - - @Override - public void accept(ConsumerRecord record) - { - this.onNewRecord(record); - handler.accept(record); - } -} -- 2.20.1 From ffd5ad8116f8269ae828a7732cf2bd862f7ba095 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 2 Nov 2022 18:36:14 +0100 Subject: [PATCH 12/16] WIP --- src/main/java/de/juplo/kafka/Application.java | 2 - .../java/de/juplo/kafka/SimpleConsumer.java | 80 +++++++++++++++++++ 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/SimpleConsumer.java diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index b4a960d..94224e1 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -11,8 +11,6 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.ConsumerFactory; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; @SpringBootApplication diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java new file mode 100644 index 0000000..0d371f4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -0,0 +1,80 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.WakeupException; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.ExecutorService; + + +@Slf4j +@RequiredArgsConstructor +public class SimpleConsumer implements Runnable +{ + private final ExecutorService executor; + private final String id; + private final String topic; + private final Consumer consumer; + + private volatile boolean running = false; + private long consumed = 0; + + + @Override + public void run() + { + try + { + log.info("{} - Subscribing to topic test", id); + consumer.subscribe(Arrays.asList("test")); + running = true; + + while (true) + { + ConsumerRecords records = + consumer.poll(Duration.ofSeconds(1)); + + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + consumed++; + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); + } + } + } + catch(WakeupException e) + { + log.info("{} - Consumer was signaled to finish its work", id); + } + catch(Exception e) + { + log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); + consumer.unsubscribe(); + } + finally + { + running = false; + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(); + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + } + } + + public void start() + { + executor.submit(this); + } +} -- 2.20.1 From d58ed9de440e56ecf906b26803591290b1c4f60d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 2 Nov 2022 18:42:34 +0100 Subject: [PATCH 13/16] WIP --- src/main/java/de/juplo/kafka/Application.java | 17 +- .../java/de/juplo/kafka/EndlessConsumer.java | 214 ------------------ .../java/de/juplo/kafka/SimpleConsumer.java | 5 - 3 files changed, 13 insertions(+), 223 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/EndlessConsumer.java diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 94224e1..ab357c7 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -11,30 +11,39 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.core.ConsumerFactory; import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; @SpringBootApplication @Slf4j public class Application implements ApplicationRunner { + @Autowired + ExecutorService executorService; @Autowired Consumer consumer; @Autowired SimpleConsumer simpleConsumer; + Future consumerJob; @Override public void run(ApplicationArguments args) throws Exception { - log.info("Starting EndlessConsumer"); - simpleConsumer.start(); + log.info("Starting SimpleConsumer"); + consumerJob = executorService.submit(simpleConsumer); } @PreDestroy - public void shutdown() + public void shutdown() throws ExecutionException, InterruptedException { - log.info("Signaling the consumer to quit its work"); + log.info("Signaling SimpleConsumer to quit its work"); consumer.wakeup(); + log.info("Waiting for SimpleConsumer to finish its work"); + consumerJob.get(); + log.info("SimpleConsumer finished its work"); } @Bean(destroyMethod = "close") diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java deleted file mode 100644 index ba8eb27..0000000 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ /dev/null @@ -1,214 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -@Component -@Slf4j -@RequiredArgsConstructor -public class EndlessConsumer implements Runnable -{ - private final ExecutorService executor; - private final String id; - private final String topic; - private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; - private final RecordHandler recordHandler; - - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; - private long consumed = 0; - - - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - recordHandler.accept(record); - - consumed++; - } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } - } - - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } - } - - public void start() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } - - public synchronized void stop() throws InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - - public boolean running() - { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } - } - - public Optional exitStatus() - { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 0d371f4..040e24b 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -72,9 +72,4 @@ public class SimpleConsumer implements Runnable log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } - - public void start() - { - executor.submit(this); - } } -- 2.20.1 From 537873ff5ce5ca87aba7be46fd85f27181e3c4b3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 2 Nov 2022 18:44:27 +0100 Subject: [PATCH 14/16] WIP --- src/main/java/de/juplo/kafka/Application.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index ab357c7..862c5f2 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -46,12 +46,6 @@ public class Application implements ApplicationRunner log.info("SimpleConsumer finished its work"); } - @Bean(destroyMethod = "close") - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } - public static void main(String[] args) { -- 2.20.1 From f324379723b2f3d681230f571f82bfbb0d53cb75 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 2 Nov 2022 18:50:19 +0100 Subject: [PATCH 15/16] WIP --- README.sh | 80 ++++--------------- docker-compose.yml | 195 ++++++++++++++------------------------------- pom.xml | 10 +-- 3 files changed, 76 insertions(+), 209 deletions(-) diff --git a/README.sh b/README.sh index 07e36d7..1394020 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/sumup-adder-springified:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -9,17 +9,16 @@ then exit fi -docker-compose rm -svf adder-1 adder-2 -docker-compose rm -svf mongo -docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli mongo express +trap 'kill $(jobs -p) 2>/dev/null' EXIT + +docker-compose up -d kafka-0 kafka-1 kafka-2 kafka-3 cli if [[ $(docker image ls -q $IMAGE) == "" || "$1" = "build" ]] then - docker-compose rm -svf adder-1 adder-2 - mvn -D skipTests clean install || exit + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE @@ -28,63 +27,12 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d gateway requests-1 requests-2 - -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done -while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-1..."; sleep 1; done -while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for requests-2..."; sleep 1; done - -docker-compose up -d peter klaus - -docker-compose up -d adder-1 -while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done -while [[ "$(http :8091/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-1..."; sleep 1; done -http -v --pretty none -S :8091/results -echo - -sleep 3 -echo "Resultate für adder-1" -http -v --pretty none -S :8091/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - - -docker-compose up -d adder-2 -while ! [[ $(http 0:8092/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-2..."; sleep 1; done -while [[ "$(http :8092/results | jq -r .)" == "{}" ]]; do echo "Waiting for some results to show up on adder-2..."; sleep 1; done -http -v --pretty none -S :8092/results -echo - -sleep 3 -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-1" -http :8091/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-1" -http :8091/results/klaus | jq .[].sum | uniq - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose stop adder-1 -until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done - -echo "Resultate für adder-2" -http -v --pretty none -S :8092/results -echo - -echo "Resultate für peter von adder-2" -http :8092/results/peter | jq .[].sum | uniq -echo "Resultate für klaus von adder-2" -http :8092/results/klaus | jq .[].sum | uniq - -docker-compose kill -s 9 peter klaus +docker-compose up -d producer + +mvn spring-boot:run & +sleep 10 +kill $(jobs -p) +mvn spring-boot:run & +sleep 10 +docker-compose stop producer +kill $(jobs -p) diff --git a/docker-compose.yml b/docker-compose.yml index 16fec5b..6d25cc1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,39 +1,56 @@ version: '3.2' services: - zookeeper: - image: confluentinc/cp-zookeeper:7.1.3 + kafka-0: + image: bitnami/kafka:3.3.1 environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ports: - - 2181:2181 + KAFKA_ENABLE_KRAFT: 'yes' + KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw + KAFKA_CFG_PROCESS_ROLES: controller + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: CONTROLLER://:9092 + KAFKA_CFG_ADVERTISED_LISTENERS: " " + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT + KAFKA_BROKER_ID: 0 + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9092 + ALLOW_PLAINTEXT_LISTENER: 'yes' + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" kafka-1: - image: confluentinc/cp-kafka:7.1.3 + image: bitnami/kafka:3.3.1 environment: + KAFKA_ENABLE_KRAFT: 'yes' + KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw + KAFKA_CFG_PROCESS_ROLES: broker + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: BROKER://:9092, LOCALHOST://:9081 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-1:9092, LOCALHOST://localhost:9081 KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081 - KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9092 + ALLOW_PLAINTEXT_LISTENER: 'yes' + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" ports: - 9081:9081 - depends_on: - - zookeeper kafka-2: - image: confluentinc/cp-kafka:7.1.3 + image: bitnami/kafka:3.3.1 environment: + KAFKA_ENABLE_KRAFT: 'yes' + KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw + KAFKA_CFG_PROCESS_ROLES: broker + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: BROKER://:9092, LOCALHOST://:9082 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-2:9092, LOCALHOST://localhost:9082 KAFKA_BROKER_ID: 2 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082 - KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9092 + ALLOW_PLAINTEXT_LISTENER: 'yes' + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" ports: - 9092:9082 - 9082:9082 @@ -41,133 +58,39 @@ services: default: aliases: - kafka - depends_on: - - zookeeper kafka-3: - image: confluentinc/cp-kafka:7.1.3 + image: bitnami/kafka:3.3.1 environment: + KAFKA_ENABLE_KRAFT: 'yes' + KAFKA_KRAFT_CLUSTER_ID: r7dMBY60T16TrNCGeXniLw + KAFKA_CFG_PROCESS_ROLES: broker + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENERS: BROKER://:9092, LOCALHOST://:9083 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT, CONTROLLER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_CFG_ADVERTISED_LISTENERS: BROKER://kafka-3:9092, LOCALHOST://localhost:9083 KAFKA_BROKER_ID: 3 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083 - KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 - KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-0:9092 + ALLOW_PLAINTEXT_LISTENER: 'yes' + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: BROKER + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "false" ports: - 9083:9083 - depends_on: - - zookeeper - - mongo: - image: mongo:4.4.13 - ports: - - 27017:27017 - environment: - MONGO_INITDB_ROOT_USERNAME: juplo - MONGO_INITDB_ROOT_PASSWORD: training - - express: - image: mongo-express - ports: - - 8090:8081 - environment: - ME_CONFIG_MONGODB_ADMINUSERNAME: juplo - ME_CONFIG_MONGODB_ADMINPASSWORD: training - ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ - depends_on: - - mongo setup: image: juplo/toolbox command: > bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic in - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic out - kafka-topics --bootstrap-server kafka:9092 --create --topic in --partitions 2 --replication-factor 3 --config min.insync.replicas=2 - kafka-topics --bootstrap-server kafka:9092 --create --topic out --partitions 2 --replication-factor 3 --config min.insync.replicas=2 - kafka-topics --bootstrap-server kafka:9092 --describe --topic in - kafka-topics --bootstrap-server kafka:9092 --describe --topic out + kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test + kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --describe --topic test " cli: image: juplo/toolbox command: sleep infinity - gateway: - image: juplo/sumup-gateway--springified:1.0-SNAPSHOT - ports: - - 8080:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafka.client-id: gateway - sumup.gateway.topic: in - - requests-1: - image: juplo/sumup-requests-json:1.0-SNAPSHOT - ports: - - 8081:8080 - environment: - server.port: 8080 - sumup.requests.bootstrap-server: kafka:9092 - sumup.requests.client-id: requests-1 - - requests-2: - image: juplo/sumup-requests-json:1.0-SNAPSHOT - ports: - - 8082:8080 - environment: - server.port: 8080 - sumup.requests.bootstrap-server: kafka:9092 - sumup.requests.client-id: requests-2 - - adder-1: - image: juplo/sumup-adder-springified:1.0-SNAPSHOT - ports: - - 8091:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafak.client-id: adder-1 - spring.kafka.auto-commit-interval: 1s - sumup.adder.throttle: 3ms - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG - - adder-2: - image: juplo/sumup-adder-springified:1.0-SNAPSHOT - ports: - - 8092:8080 - environment: - server.port: 8080 - spring.kafka.bootstrap-servers: kafka:9092 - spring.kafak.client-id: adder-2 - spring.kafka.auto-commit-interval: 1s - sumup.adder.throttle: 3ms - spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 - spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG - - peter: - image: juplo/toolbox - command: > - bash -c " - while [[ true ]]; - do - echo 666 | http -v gateway:8080/peter; - sleep 1; - done - " - klaus: - image: juplo/toolbox - command: > - bash -c " - while [[ true ]]; - do - echo 666 | http -v gateway:8080/klaus; - sleep 1; - done - " + producer: + image: juplo/simple-producer:1.0-SNAPSHOT + command: producer diff --git a/pom.xml b/pom.xml index a252d1c..9935ed8 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - sumup-adder-springified + spring-consumer 1.0-SNAPSHOT - SumUp Adder - Calculates the sum for the send messages. This version consumes JSON-messages. + Spring Consumer + Super Simple Consumer-Group, that is configured by Spring Kafka 11 @@ -26,10 +26,6 @@ org.springframework.boot spring-boot-starter-web - - org.springframework.boot - spring-boot-starter-data-mongodb - org.springframework.boot spring-boot-starter-validation -- 2.20.1 From 255052b2411353652f5ec879c2f6ce7fe6c54969 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 2 Nov 2022 18:53:06 +0100 Subject: [PATCH 16/16] WIP:StringDeserializer --- src/main/resources/application.yml | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index c2cb792..ae7aae8 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -30,13 +30,10 @@ spring: auto-offset-reset: earliest auto-commit-interval: 5s key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + value-deserializer: org.springframework.kafka.support.serializer.StringDeserializer properties: partition.assignment.strategy: org.apache.kafka.clients.consumer.StickyAssignor metadata.max.age.ms: 1000 - spring.json.type.mapping: > - ADD:de.juplo.kafka.MessageAddNumber, - CALC:de.juplo.kafka.MessageCalculateSum logging: level: root: INFO -- 2.20.1