From c446512ec3bfa29e5e8482074cb6daf7e2ee1b2f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 26 Jul 2022 15:37:43 +0200 Subject: [PATCH 01/16] =?utf8?q?Testfall=20=C3=BCberarbeitet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Abhängigkeit der Testergebnisse von Ausführreihenfolge beseitigt. * Die Abhängigkeit bestand, da die Offset-Positionen als Zustand die Testausführung überdauert haben. * Daher konnte kein weiterer Test mehr ausgeführt werden, nachdem einmal eine Poison-Pill in das Topic geschrieben wurde, über die der implementierte Consumer stolpert. * Um das zu umgehen, werden die Offset-Positionen jetzt nach jedem Test auf das Ende der Partitionen verschoben. D.h., wenn in dem Test eine Poision-Pill geschrieben wird, über die der implementierte Consumer nicht hinweglesen kann, werden die Offests vor der Ausführung des nächsten Tests über diese Poision-Pill hinweg gesetzt. * Dadurch ist wurde ein Fehler / eine Schwäche in der Testlogik aufgedeckt: In dem Test für das erfolgreiche Schreiben wurde nur deswegen ein Commit ausgeführt, weil zuvor noch kein Commit durchgeführt wurde, so dass der Default-Wert für das Commit-Interval immer überschritten war. * Um das zu umgehen, wurde eine Konfigurations-Option für das Setzen des Parameters `auto.commit.interval` eingeführt, so dass im Test sichergestellt werden kann, dass auf jeden Fall in dem beobachteten Zeitraum ein automatischer Commit ausgelöst wird. * Außerdem: Weniger verwirrende Ausgabe des Offset-Fortschritts. --- .../juplo/kafka/ApplicationConfiguration.java | 1 + .../de/juplo/kafka/ApplicationProperties.java | 3 ++ src/main/resources/application.yml | 1 + .../java/de/juplo/kafka/ApplicationTests.java | 29 ++++++++++++++----- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4054e93..766740b 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -58,6 +58,7 @@ public class ApplicationConfiguration props.put("group.id", properties.getGroupId()); props.put("client.id", properties.getClientId()); props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("auto.commit.interval.ms", (int)properties.getCommitInterval().toMillis()); props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", LongDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index fa731c5..14e928f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -7,6 +7,7 @@ import org.springframework.validation.annotation.Validated; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import java.time.Duration; @ConfigurationProperties(prefix = "consumer") @@ -30,4 +31,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String autoOffsetReset; + @NotNull + private Duration commitInterval; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 9f3cb81..f8bfe7e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,6 +4,7 @@ consumer: client-id: DEV topic: test auto-offset-reset: earliest + commit-interval: 5s management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 05eebd0..26a34e4 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -25,7 +25,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,7 +39,8 @@ import static org.awaitility.Awaitility.*; @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", - "consumer.topic=" + TOPIC }) + "consumer.topic=" + TOPIC, + "consumer.commit-interval=100ms" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j class ApplicationTests @@ -74,7 +74,6 @@ class ApplicationTests /** Tests methods */ @Test - @Order(1) // << The poistion pill is not skipped. Hence, this test must run first void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { send100Messages((partition, key, counter) -> @@ -101,8 +100,7 @@ class ApplicationTests } @Test - @Order(2) - void commitsOffsetOfErrorForReprocessingOnError() + void commitsOffsetOfErrorForReprocessingOnDeserializationError() { send100Messages((partition, key, counter) -> { @@ -159,8 +157,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -175,6 +173,21 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + offsetConsumer.seekToEnd(partitions()); + partitions().forEach(tp -> + { + // seekToEnd() works lazily: it only takes effect on poll()/position() + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + }); + // The new positions must be commited! + offsetConsumer.commitSync(); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { offsetConsumer.assign(partitions()); @@ -238,6 +251,8 @@ class ApplicationTests { testHandler = record -> {} ; + seekToEnd(); + oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); -- 2.20.1 From be1b513f8bd7646f9ceb3a7ba90952641e3af125 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 26 Jul 2022 16:03:10 +0200 Subject: [PATCH 02/16] Verhalten des Testfalls kontrollierbarer gemacht MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Awaitility-Aufrufe pollen den zu prüfenden Zustand wenn nicht anders angegeben so häufig, wie es die CPU zulässt - also ohne Verzögerung zwischen den Überprüfungen. * Das kann den Rechner temporär so überlasten, dass der erwartete Zustand in der abgewarteten Zeit gar nicht eintritt! * Z.B. aufgetreten, wenn wie hier das Commit-Interval auf 1 Sekunde gesetzt ist, das Polling von Awaitility aber noch ungebremst durchgeführt wird. * Um diese Quelle für falsche Fehler auszuschließen, wurde jetzt durchgängig ein Poll-Intervall von 1 Sekunde für Awaitility gesetzt. --- src/test/java/de/juplo/kafka/ApplicationTests.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 26a34e4..3bac537 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -40,7 +40,7 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=100ms" }) + "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j class ApplicationTests @@ -84,10 +84,12 @@ class ApplicationTests await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -112,6 +114,7 @@ class ApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); @@ -120,6 +123,7 @@ class ApplicationTests endlessConsumer.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); -- 2.20.1 From b7a418d6c90c25187d6a00ba769aec895e5b7396 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 26 Jul 2022 16:21:17 +0200 Subject: [PATCH 03/16] =?utf8?q?Test=20pr=C3=BCft=20ung=C3=BCltige=20und?= =?utf8?q?=20unbekannte=20Nachrichten?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 90 +++++++++++++++---- 1 file changed, 74 insertions(+), 16 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 24d3a9e..b5644b6 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -76,7 +76,7 @@ class ApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException + void commitsCurrentOffsetsOnSuccess() { send100Messages((partition, key, counter) -> { @@ -89,11 +89,11 @@ class ApplicationTests type = "message"; } else { - value = serializeGreeting(key, counter); + value = serializeGreeting(key); type = "greeting"; } - return toRecord(partition, key, value, type); + return toRecord(partition, key, value, Optional.of(type)); }); await("100 records received") @@ -116,7 +116,64 @@ class ApplicationTests } @Test - void commitsOffsetOfErrorForReprocessingOnDeserializationError() + void commitsOffsetOfErrorForReprocessingOnDeserializationErrorInvalidMessage() + { + send100Messages((partition, key, counter) -> + { + Bytes value; + String type; + + if (counter == 77) + { + value = serializeFooMessage(key, counter); + type = null; + } + else + { + if (counter%3 != 0) + { + value = serializeClientMessage(key, counter); + type = "message"; + } + else { + value = serializeGreeting(key); + type = "greeting"; + } + } + + return toRecord(partition, key, value, Optional.ofNullable(type)); + }); + + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); + assertThat(endlessConsumer.exitStatus()) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RecordDeserializationException.class); + } + + @Test + void commitsOffsetOfErrorForReprocessingOnDeserializationErrorOnUnknownMessage() { send100Messages((partition, key, counter) -> { @@ -136,12 +193,12 @@ class ApplicationTests type = "message"; } else { - value = serializeGreeting(key, counter); + value = serializeGreeting(key); type = "greeting"; } } - return toRecord(partition, key, value, type); + return toRecord(partition, key, value, Optional.of(type)); }); await("Consumer failed") @@ -243,12 +300,12 @@ class ApplicationTests public interface RecordGenerator { - public ProducerRecord generate(int partition, String key, long counter); + public ProducerRecord generate(int partition, String key, int counter); } void send100Messages(RecordGenerator recordGenerator) { - long i = 0; + int i = 0; for (int partition = 0; partition < 10; partition++) { @@ -281,29 +338,30 @@ class ApplicationTests } } - ProducerRecord toRecord(int partition, String key, Bytes value, String type) - { + ProducerRecord toRecord(int partition, String key, Bytes value, Optional type) + { ProducerRecord record = new ProducerRecord<>(TOPIC, partition, key, value); - record.headers().add("__TypeId__", type.getBytes()); + + type.ifPresent(typeId -> record.headers().add("__TypeId__", typeId.getBytes())); return record; } - Bytes serializeClientMessage(String key, Long value) + Bytes serializeClientMessage(String key, int value) { - TestClientMessage message = new TestClientMessage(key, value.toString()); + TestClientMessage message = new TestClientMessage(key, Integer.toString(value)); return new Bytes(valueSerializer.serialize(TOPIC, message)); } - Bytes serializeGreeting(String key, Long value) + Bytes serializeGreeting(String key) { TestGreeting message = new TestGreeting(key, LocalDateTime.now()); return new Bytes(valueSerializer.serialize(TOPIC, message)); } - Bytes serializeFooMessage(String key, Long value) + Bytes serializeFooMessage(String key, int value) { - TestFooMessage message = new TestFooMessage(key, value); + TestFooMessage message = new TestFooMessage(key, (long)value); return new Bytes(valueSerializer.serialize(TOPIC, message)); } -- 2.20.1 From d6285c99580cf17465182b68c68ae9a3674ae7a7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 12 Aug 2022 22:31:24 +0200 Subject: [PATCH 04/16] =?utf8?q?Compose-Setup=20und=20README.sh=20f=C3=BCr?= =?utf8?q?=20dieses=20Beispiel=20repariert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Zuvor war in dem Setup noch ein Producer konfiguriert, der Nachrichten vom Typ `String` geschrieben hat, so dass der Consumer _sofort_ das zeitliche gesegnet hat. * Im README-Skript wurde nicht darauf gewartet, dass der Consumer gemeldet hat, dass er ordentlich gestartet ist, bevor er nach der vermeintlichen Konsumption der Poison-Pill wieder neu gestartet wurde. --- README.sh | 6 +++--- docker-compose.yml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/README.sh b/README.sh index 72f0c60..fe237dc 100755 --- a/README.sh +++ b/README.sh @@ -25,7 +25,9 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer consumer +docker-compose up -d producer +docker-compose up consumer & +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done sleep 5 docker-compose exec -T cli bash << 'EOF' echo "Writing poison pill into topic test..." @@ -42,5 +44,3 @@ while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consum http -v :8081/actuator/health http -v post :8081/actuator/shutdown docker-compose stop producer -docker-compose ps -docker-compose logs --tail=100 consumer diff --git a/docker-compose.yml b/docker-compose.yml index 159f9cb..81b98ac 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,7 +37,7 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/endless-long-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: -- 2.20.1 From 6c1d4abdf0a96e3aef8452b3e2c10be14f378700 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 13 Aug 2022 12:37:27 +0200 Subject: [PATCH 05/16] Demonstration in README.sh gepimped --- README.sh | 38 ++++++++++++++++++++++++++++++-------- docker-compose.yml | 42 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/README.sh b/README.sh index fe237dc..2a1e5d8 100755 --- a/README.sh +++ b/README.sh @@ -25,22 +25,44 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer -docker-compose up consumer & -while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done +docker-compose up -d + +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done +while ! [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-2..."; sleep 1; done +while ! [[ $(http 0:8083/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-3..."; sleep 1; done +while ! [[ $(http 0:8084/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-4..."; sleep 1; done +while ! [[ $(http 0:8085/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-5..."; sleep 1; done + sleep 5 + docker-compose exec -T cli bash << 'EOF' echo "Writing poison pill into topic test..." # tag::poisonpill[] echo 'BOOM!' | kafkacat -P -b kafka:9092 -t test # end::poisonpill[] EOF -while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done + +while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-1 is still running..."; sleep 1; done +while [[ $(http 0:8082/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-2 is still running..."; sleep 1; done +while [[ $(http 0:8083/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-3 is still running..."; sleep 1; done +while [[ $(http 0:8084/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-4 is still running..."; sleep 1; done +while [[ $(http 0:8085/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-5 is still running..."; sleep 1; done + http -v :8081/actuator/health -echo "Restarting consumer" +echo "Restarting consumer-1" http -v post :8081/start -while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer..."; sleep 1; done -while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Consumer is still running..."; sleep 1; done + +echo "Waiting for consumer-1 to come up" +while ! [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for consumer-1..."; sleep 1; done +http -v :8081/actuator/health + +echo "Waiting for consumer-1 to crash" +while [[ $(http 0:8081/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "consumer-1 is still running..."; sleep 1; done http -v :8081/actuator/health -http -v post :8081/actuator/shutdown + docker-compose stop producer +docker-compose logs --tail=10 consumer-1 +docker-compose logs --tail=10 consumer-2 +docker-compose logs --tail=10 consumer-3 +docker-compose logs --tail=10 consumer-4 +docker-compose logs --tail=10 consumer-5 diff --git a/docker-compose.yml b/docker-compose.yml index 81b98ac..d36e851 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,13 +48,47 @@ services: producer.throttle-ms: 200 - consumer: + consumer-1: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - 8081:8080 environment: server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group - consumer.client-id: consumer - consumer.topic: test + consumer.client-id: consumer-1 + + consumer-2: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8082:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.client-id: consumer-2 + + consumer-3: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8083:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.client-id: consumer-3 + + consumer-4: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8084:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.client-id: consumer-4 + + consumer-5: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8085:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.client-id: consumer-5 -- 2.20.1 From a126175808eeec80355deb8eb3f4ef7e85e84780 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 09:54:45 +0200 Subject: [PATCH 06/16] =?utf8?q?Typisierbare=20Basis-Klasse=20`GenericAppl?= =?utf8?q?icationTests`=20eingef=C3=BChrt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 + ...Tests.java => GenericApplicationTest.java} | 165 ++++++++---------- 2 files changed, 73 insertions(+), 96 deletions(-) rename src/test/java/de/juplo/kafka/{ApplicationTests.java => GenericApplicationTest.java} (67%) diff --git a/pom.xml b/pom.xml index 1f5caab..6fd5d5f 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,10 @@ 1.0-SNAPSHOT Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic + + 11 + + org.springframework.boot diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTest.java similarity index 67% rename from src/test/java/de/juplo/kafka/ApplicationTests.java rename to src/test/java/de/juplo/kafka/GenericApplicationTest.java index 3bac537..6b2b635 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTest.java @@ -13,7 +13,6 @@ import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -21,21 +20,19 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.GenericApplicationTest.PARTITIONS; +import static de.juplo.kafka.GenericApplicationTest.TOPIC; import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -43,44 +40,48 @@ import static org.awaitility.Awaitility.*; "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j -class ApplicationTests +abstract class GenericApplicationTest { public static final String TOPIC = "FOO"; public static final int PARTITIONS = 10; - StringSerializer stringSerializer = new StringSerializer(); - - @Autowired - Serializer valueSerializer; - @Autowired - KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; - @Autowired - KafkaConsumer offsetConsumer; + KafkaConsumer kafkaConsumer; @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + KafkaProducer testRecordProducer; + KafkaConsumer offsetConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; + + + final Serializer keySerializer; + final RecordGenerator recordGenerator; + final Consumer> messageSender; + + public GenericApplicationTest( + Serializer keySerializer, + RecordGenerator recordGenerator) + { + this.keySerializer = keySerializer; + this.recordGenerator = recordGenerator; + this.messageSender = (record) -> sendMessage(record); + } /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException + void commitsCurrentOffsetsOnSuccess() { - send100Messages((partition, key, counter) -> - { - Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); - }); + recordGenerator.generate(100, Set.of(), messageSender); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -104,13 +105,7 @@ class ApplicationTests @Test void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - send100Messages((partition, key, counter) -> - { - Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); - }); + recordGenerator.generate(100, Set.of(77), messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -209,50 +204,58 @@ class ApplicationTests } - public interface RecordGenerator + public interface RecordGenerator { - public ProducerRecord generate(int partition, String key, long counter); + void generate( + int numberOfMessagesToGenerate, + Set poistionPills, + Consumer> messageSender); } - void send100Messages(RecordGenerator recordGenerator) + void sendMessage(ProducerRecord record) { - long i = 0; - - for (int partition = 0; partition < 10; partition++) + testRecordProducer.send(record, (metadata, e) -> { - for (int key = 0; key < 10; key++) + if (metadata != null) { - ProducerRecord record = - recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); - - kafkaProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); + log.debug( + "{}|{} - {}={}", + metadata.partition(), + metadata.offset(), + record.key(), + record.value()); } - } + else + { + log.warn( + "Exception for {}={}: {}", + record.key(), + record.value(), + e.toString()); + } + }); } @BeforeEach public void init() { + Properties props; + props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("linger.ms", 100); + props.put("key.serializer", keySerializer.getClass().getName()); + props.put("value.serializer", BytesSerializer.class.getName()); + testRecordProducer = new KafkaProducer<>(props); + + props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", "OFFSET-CONSUMER"); + props.put("group.id", properties.getGroupId()); + props.put("key.deserializer", BytesDeserializer.class.getName()); + props.put("value.deserializer", BytesDeserializer.class.getName()); + offsetConsumer = new KafkaConsumer<>(props); + testHandler = record -> {} ; seekToEnd(); @@ -267,7 +270,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( @@ -294,6 +297,8 @@ class ApplicationTests try { endlessConsumer.stop(); + testRecordProducer.close(); + offsetConsumer.close(); } catch (Exception e) { @@ -304,37 +309,5 @@ class ApplicationTests @TestConfiguration @Import(ApplicationConfiguration.class) - public static class Configuration - { - @Bean - Serializer serializer() - { - return new LongSerializer(); - } - - @Bean - KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("linger.ms", 100); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - - return new KafkaProducer<>(props); - } - - @Bean - KafkaConsumer offsetConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } - } + public static class Configuration {} } -- 2.20.1 From 2bf77d19d90e7356e1a7c6e13202971fd1b9897b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 10:54:27 +0200 Subject: [PATCH 07/16] `ApplicationTest` auf basis der typisierbaren Basis neu implementiert --- .../java/de/juplo/kafka/ApplicationTest.java | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/ApplicationTest.java diff --git a/src/test/java/de/juplo/kafka/ApplicationTest.java b/src/test/java/de/juplo/kafka/ApplicationTest.java new file mode 100644 index 0000000..d3ff3b1 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTest.java @@ -0,0 +1,57 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; + +import java.util.Set; +import java.util.function.Consumer; + + +public class ApplicationTest extends GenericApplicationTest +{ + public ApplicationTest() + { + super( + new StringSerializer(), + new RecordGenerator<> () + { + final StringSerializer stringSerializer = new StringSerializer(); + final LongSerializer longSerializer = new LongSerializer(); + + + @Override + public void generate( + int numberOfMessagesToGenerate, + Set poisonPills, + Consumer> messageSender) + { + int i = 0; + + for (int partition = 0; partition < 10; partition++) + { + for (int key = 0; key < 10; key++) + { + if (++i > numberOfMessagesToGenerate) + return; + + Bytes value = + poisonPills.contains(i) + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(longSerializer.serialize(TOPIC, (long)i)); + + ProducerRecord record = + new ProducerRecord<>( + TOPIC, + partition, + Integer.toString(partition*10+key%2), + value); + + messageSender.accept(record); + } + } + } + }); + } +} -- 2.20.1 From 80f616369c011db99eddf42c6ee91e66fd1dfd07 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 11:32:10 +0200 Subject: [PATCH 08/16] =?utf8?q?Typisierung=20in=20`GenericApplicationTest?= =?utf8?q?`=20nur=20noch,=20wo=20wirklich=20n=C3=B6tig?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Es wird nur noch dort mit Typisierung gearbeitet, wo dies unumgänglich ist, weil die typisierte Implementierung angesprochen wird. * Das Versenden der Test-Nachrichten erfolgt als `Bytes` für Schlüssel und Nachricht. * Dadurch muss der `RecordGenerator` nicht mehr typisiert werden. * Dafür muss die typisierte Implementierung des Testfalls dann Schlüssel und Nachricht mit einem passenden Serializer in eine `Bytes`-Payload umwandeln. --- .../java/de/juplo/kafka/ApplicationTest.java | 9 ++++----- .../juplo/kafka/GenericApplicationTest.java | 20 ++++++++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTest.java b/src/test/java/de/juplo/kafka/ApplicationTest.java index d3ff3b1..81165ab 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTest.java +++ b/src/test/java/de/juplo/kafka/ApplicationTest.java @@ -14,8 +14,7 @@ public class ApplicationTest extends GenericApplicationTest public ApplicationTest() { super( - new StringSerializer(), - new RecordGenerator<> () + new RecordGenerator() { final StringSerializer stringSerializer = new StringSerializer(); final LongSerializer longSerializer = new LongSerializer(); @@ -25,7 +24,7 @@ public class ApplicationTest extends GenericApplicationTest public void generate( int numberOfMessagesToGenerate, Set poisonPills, - Consumer> messageSender) + Consumer> messageSender) { int i = 0; @@ -41,11 +40,11 @@ public class ApplicationTest extends GenericApplicationTest ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) : new Bytes(longSerializer.serialize(TOPIC, (long)i)); - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( TOPIC, partition, - Integer.toString(partition*10+key%2), + new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))), value); messageSender.accept(record); diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTest.java b/src/test/java/de/juplo/kafka/GenericApplicationTest.java index 6b2b635..68f150f 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTest.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTest.java @@ -53,7 +53,7 @@ abstract class GenericApplicationTest @Autowired ExecutorService executor; - KafkaProducer testRecordProducer; + KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; Consumer> testHandler; EndlessConsumer endlessConsumer; @@ -62,15 +62,11 @@ abstract class GenericApplicationTest Set> receivedRecords; - final Serializer keySerializer; - final RecordGenerator recordGenerator; - final Consumer> messageSender; + final RecordGenerator recordGenerator; + final Consumer> messageSender; - public GenericApplicationTest( - Serializer keySerializer, - RecordGenerator recordGenerator) + public GenericApplicationTest(RecordGenerator recordGenerator) { - this.keySerializer = keySerializer; this.recordGenerator = recordGenerator; this.messageSender = (record) -> sendMessage(record); } @@ -204,15 +200,15 @@ abstract class GenericApplicationTest } - public interface RecordGenerator + public interface RecordGenerator { void generate( int numberOfMessagesToGenerate, Set poistionPills, - Consumer> messageSender); + Consumer> messageSender); } - void sendMessage(ProducerRecord record) + void sendMessage(ProducerRecord record) { testRecordProducer.send(record, (metadata, e) -> { @@ -244,7 +240,7 @@ abstract class GenericApplicationTest props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); props.put("linger.ms", 100); - props.put("key.serializer", keySerializer.getClass().getName()); + props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); -- 2.20.1 From da3b3c96a862aa1408933b312ca965dad1bbe482 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 12:59:20 +0200 Subject: [PATCH 09/16] =?utf8?q?`GenericApplicationTest`=20=C3=BCberspring?= =?utf8?q?=20Tests,=20wenn=20Fehler=20nicht=20verf=C3=BCgbar?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Über eine Annotation wird für Tests, die einen bestimmten Fehler-Typ benötigen bei dem `RecordGenerator` nachgefragt, ob der Fehler-Typ erzeugt werden kann. * Wenn der Fehler-Typ nicht zur Verfügung steht, wird der Test übersprungen. --- .../ErrorCannotBeGeneratedCondition.java | 60 +++++++++++++++++++ .../juplo/kafka/GenericApplicationTest.java | 12 ++++ .../kafka/SkipWhenErrorCannotBeGenerated.java | 15 +++++ 3 files changed, 87 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java create mode 100644 src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java new file mode 100644 index 0000000..6d772ce --- /dev/null +++ b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java @@ -0,0 +1,60 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.extension.ConditionEvaluationResult; +import org.junit.jupiter.api.extension.ExecutionCondition; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.platform.commons.util.AnnotationUtils; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + + +public class ErrorCannotBeGeneratedCondition implements ExecutionCondition +{ + @Override + public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) + { + final Optional optional = + AnnotationUtils.findAnnotation( + context.getElement(), + SkipWhenErrorCannotBeGenerated.class); + + if (context.getTestInstance().isEmpty()) + return ConditionEvaluationResult.enabled("Test-instance ist not available"); + + if (optional.isPresent()) + { + SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); + GenericApplicationTest instance = (GenericApplicationTest)context.getTestInstance().get(); + List missingRequiredErrors = new LinkedList<>(); + + if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisionPill()) + missingRequiredErrors.add("Poison-Pill"); + + if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) + missingRequiredErrors.add("Logic-Error"); + + StringBuilder builder = new StringBuilder(); + builder.append(context.getTestClass().get().getSimpleName()); + + if (missingRequiredErrors.isEmpty()) + { + builder.append(" can generate all required types of errors"); + return ConditionEvaluationResult.enabled(builder.toString()); + } + + builder.append(" cannot generate the required error(s): "); + builder.append( + missingRequiredErrors + .stream() + .collect(Collectors.joining(", "))); + + return ConditionEvaluationResult.disabled(builder.toString()); + } + + return ConditionEvaluationResult.enabled( + "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName()); + } +} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTest.java b/src/test/java/de/juplo/kafka/GenericApplicationTest.java index a6d6aa1..8a57a96 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTest.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTest.java @@ -100,6 +100,7 @@ abstract class GenericApplicationTest } @Test + @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); @@ -133,6 +134,7 @@ abstract class GenericApplicationTest } @Test + @SkipWhenErrorCannotBeGenerated(logicError = true) void doesNotCommitOffsetsOnLogicError() { recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); @@ -241,6 +243,16 @@ abstract class GenericApplicationTest Set poisonPills, Set logicErrors, Consumer> messageSender); + + default boolean canGeneratePoisionPill() + { + return true; + } + + default boolean canGenerateLogicError() + { + return true; + } } void sendMessage(ProducerRecord record) diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java new file mode 100644 index 0000000..6d15e9e --- /dev/null +++ b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java @@ -0,0 +1,15 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.extension.ExtendWith; + +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; + + +@Retention(RetentionPolicy.RUNTIME) +@ExtendWith(ErrorCannotBeGeneratedCondition.class) +public @interface SkipWhenErrorCannotBeGenerated +{ + boolean poisonPill() default false; + boolean logicError() default false; +} -- 2.20.1 From c6a33a3c27563018d99a56fe4069f20de64e9f4c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 13:26:08 +0200 Subject: [PATCH 10/16] Namen der Test-Klassen korrigiert --- .../kafka/{ApplicationTest.java => ApplicationTests.java} | 6 +++--- .../de/juplo/kafka/ErrorCannotBeGeneratedCondition.java | 2 +- ...cApplicationTest.java => GenericApplicationTests.java} | 8 ++++---- 3 files changed, 8 insertions(+), 8 deletions(-) rename src/test/java/de/juplo/kafka/{ApplicationTest.java => ApplicationTests.java} (93%) rename src/test/java/de/juplo/kafka/{GenericApplicationTest.java => GenericApplicationTests.java} (97%) diff --git a/src/test/java/de/juplo/kafka/ApplicationTest.java b/src/test/java/de/juplo/kafka/ApplicationTests.java similarity index 93% rename from src/test/java/de/juplo/kafka/ApplicationTest.java rename to src/test/java/de/juplo/kafka/ApplicationTests.java index ed93a21..51d579e 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTest.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -14,10 +14,10 @@ import java.util.Set; import java.util.function.Consumer; -@ContextConfiguration(classes = ApplicationTest.Configuration.class) -public class ApplicationTest extends GenericApplicationTest +@ContextConfiguration(classes = ApplicationTests.Configuration.class) +public class ApplicationTests extends GenericApplicationTests { - public ApplicationTest() + public ApplicationTests() { super( new RecordGenerator() diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java index 6d772ce..99af3b2 100644 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java @@ -27,7 +27,7 @@ public class ErrorCannotBeGeneratedCondition implements ExecutionCondition if (optional.isPresent()) { SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); - GenericApplicationTest instance = (GenericApplicationTest)context.getTestInstance().get(); + GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); List missingRequiredErrors = new LinkedList<>(); if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisionPill()) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTest.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java similarity index 97% rename from src/test/java/de/juplo/kafka/GenericApplicationTest.java rename to src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8a57a96..8b9a3ff 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTest.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -26,8 +26,8 @@ import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.GenericApplicationTest.PARTITIONS; -import static de.juplo.kafka.GenericApplicationTest.TOPIC; +import static de.juplo.kafka.GenericApplicationTests.PARTITIONS; +import static de.juplo.kafka.GenericApplicationTests.TOPIC; import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; @@ -40,7 +40,7 @@ import static org.awaitility.Awaitility.*; "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j -abstract class GenericApplicationTest +abstract class GenericApplicationTests { public static final String TOPIC = "FOO"; public static final int PARTITIONS = 10; @@ -66,7 +66,7 @@ abstract class GenericApplicationTest final RecordGenerator recordGenerator; final Consumer> messageSender; - public GenericApplicationTest(RecordGenerator recordGenerator) + public GenericApplicationTests(RecordGenerator recordGenerator) { this.recordGenerator = recordGenerator; this.messageSender = (record) -> sendMessage(record); -- 2.20.1 From 27768041f2c2f4b1cbb8c45c9a5d665490050f76 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 15:26:26 +0200 Subject: [PATCH 11/16] Anzahl der erzeugten Test-Nachrichten wird vom `RecordGenerator` bestimmt --- src/test/java/de/juplo/kafka/ApplicationTests.java | 3 +-- src/test/java/de/juplo/kafka/GenericApplicationTests.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 51d579e..1272124 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -28,7 +28,6 @@ public class ApplicationTests extends GenericApplicationTests @Override public void generate( - int numberOfMessagesToGenerate, Set poisonPills, Set logicErrors, Consumer> messageSender) @@ -39,7 +38,7 @@ public class ApplicationTests extends GenericApplicationTests { for (int key = 0; key < 10; key++) { - if (++i > numberOfMessagesToGenerate) + if (++i > 100) return; Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 8b9a3ff..1aacb94 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -239,7 +239,6 @@ abstract class GenericApplicationTests public interface RecordGenerator { void generate( - int numberOfMessagesToGenerate, Set poisonPills, Set logicErrors, Consumer> messageSender); -- 2.20.1 From 1288af99aeb350661f8b0a60762cba8e1b0f6a24 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 15:35:28 +0200 Subject: [PATCH 12/16] =?utf8?q?Signatur=20und=20Handling=20des=20`RecordG?= =?utf8?q?enerator`=20vereinfacht/=C3=BCberarbeitet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der `RecordGenerator` darf jetzt selbst bestimmen, wie viele Nachrichten er erzeugt und wo wieviele Poison-Pills oder Logik-Fehler erzeugt werden, wenn der Test dies anfordert. * Dafür git der `RecordGenerator` jetzt die Anzahl der tatsächlich erzeugten Nachrichten zurück, damit die Tests richtig reagieren können. --- .../java/de/juplo/kafka/ApplicationTests.java | 26 +++++++++++-------- .../ErrorCannotBeGeneratedCondition.java | 2 +- .../juplo/kafka/GenericApplicationTests.java | 25 ++++++++++-------- 3 files changed, 30 insertions(+), 23 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 1272124..8369a7b 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,9 +27,9 @@ public class ApplicationTests extends GenericApplicationTests @Override - public void generate( - Set poisonPills, - Set logicErrors, + public int generate( + boolean poisonPills, + boolean logicErrors, Consumer> messageSender) { int i = 0; @@ -38,17 +38,19 @@ public class ApplicationTests extends GenericApplicationTests { for (int key = 0; key < 10; key++) { - if (++i > 100) - return; + i++; Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); - if (logicErrors.contains(i)) + if (i == 77) { - value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); - } - if (poisonPills.contains(i)) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + if (logicErrors) + { + value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE)); + } + if (poisonPills) + { + value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!")); + } } ProducerRecord record = @@ -61,6 +63,8 @@ public class ApplicationTests extends GenericApplicationTests messageSender.accept(record); } } + + return i; } }); } diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java index 99af3b2..606218f 100644 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java @@ -30,7 +30,7 @@ public class ErrorCannotBeGeneratedCondition implements ExecutionCondition GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); List missingRequiredErrors = new LinkedList<>(); - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisionPill()) + if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) missingRequiredErrors.add("Poison-Pill"); if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 1aacb94..9175e52 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -78,12 +78,13 @@ abstract class GenericApplicationTests @Test void commitsCurrentOffsetsOnSuccess() { - recordGenerator.generate(100, Set.of(), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, false, messageSender); - await("100 records received") + await(numberOfGeneratedMessages + " records received") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= 100); + .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); await("Offsets committed") .atMost(Duration.ofSeconds(10)) @@ -103,7 +104,8 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(poisonPill = true) void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - recordGenerator.generate(100, Set.of(77), Set.of(), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(true, false, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -123,7 +125,7 @@ abstract class GenericApplicationTests compareToCommitedOffsets(newOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -137,7 +139,8 @@ abstract class GenericApplicationTests @SkipWhenErrorCannotBeGenerated(logicError = true) void doesNotCommitOffsetsOnLogicError() { - recordGenerator.generate(100, Set.of(), Set.of(77), messageSender); + int numberOfGeneratedMessages = + recordGenerator.generate(false, true, messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -157,7 +160,7 @@ abstract class GenericApplicationTests compareToCommitedOffsets(oldOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") - .isLessThan(100); + .isLessThan(numberOfGeneratedMessages); assertThatNoException() .describedAs("Consumer should not be running") @@ -238,12 +241,12 @@ abstract class GenericApplicationTests public interface RecordGenerator { - void generate( - Set poisonPills, - Set logicErrors, + int generate( + boolean poisonPills, + boolean logicErrors, Consumer> messageSender); - default boolean canGeneratePoisionPill() + default boolean canGeneratePoisonPill() { return true; } -- 2.20.1 From 4b19a0061b88863c015424088f429b6998557dc8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 15:40:39 +0200 Subject: [PATCH 13/16] =?utf8?q?Methode=20zu=20pr=C3=BCfen=20der=20Fachlog?= =?utf8?q?ik=20in=20`RecordGenerator`=20erg=C3=A4nzt=20und=20angebunden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/GenericApplicationTests.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 9175e52..ebad5a8 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -98,6 +98,8 @@ abstract class GenericApplicationTests assertThatExceptionOfType(IllegalStateException.class) .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + + recordGenerator.assertBusinessLogic(); } @Test @@ -133,6 +135,8 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RecordDeserializationException.class); + + recordGenerator.assertBusinessLogic(); } @Test @@ -168,6 +172,8 @@ abstract class GenericApplicationTests assertThat(endlessConsumer.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RuntimeException.class); + + recordGenerator.assertBusinessLogic(); } @@ -255,6 +261,11 @@ abstract class GenericApplicationTests { return true; } + + default void assertBusinessLogic() + { + log.debug("No business-logic to assert"); + } } void sendMessage(ProducerRecord record) -- 2.20.1 From 4bff99449827596fd62a16165da42225d5956804 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 19 Aug 2022 11:10:52 +0200 Subject: [PATCH 14/16] ROT: Fehler in Test-Logik aufgedeckt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Einige Assertions in dem Test für die Offset-Position nach einem Logik-Fehler waren fehlerhaft. * Dies ist bisher nicht aufgefallen, weil der Test nicht scharf genug war: Er hat so wenig Nachrichten gesendet, dass die fehlerhaften Assertions nicht aufgefallen sind, weil es nie zu einem Commit gekommen ist, bevor der Fehler ausgelöst wurde. * TODO: Der Test ist wahrscheinlich immer noch in hohem Maße abhängig von der Ausführungsgeschwindigkeit auf dem Test-System. Besser wäre es, wenn die Verarbeitung künstlich gedrosselt würde, so dass die Timing-Annahmen zu den asynchron ablaufenden Operationen nicht auf das Testsystem abgestimmt werden müssen. --- src/test/java/de/juplo/kafka/ApplicationTests.java | 4 ++-- src/test/java/de/juplo/kafka/GenericApplicationTests.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 8369a7b..1e73040 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -36,12 +36,12 @@ public class ApplicationTests extends GenericApplicationTests for (int partition = 0; partition < 10; partition++) { - for (int key = 0; key < 10; key++) + for (int key = 0; key < 10000; key++) { i++; Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i)); - if (i == 77) + if (i == 99977) { if (logicErrors) { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index ebad5a8..b0abf37 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -37,7 +37,7 @@ import static org.awaitility.Awaitility.*; properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", "consumer.topic=" + TOPIC, - "consumer.commit-interval=1s" }) + "consumer.commit-interval=500ms" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j abstract class GenericApplicationTests -- 2.20.1 From 9ae781a6e047a7b857aaf7fd79d134eb7b48b267 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 18 Aug 2022 23:36:22 +0200 Subject: [PATCH 15/16] =?utf8?q?GR=C3=9CN:=20Fehler=20in=20der=20Test-Logi?= =?utf8?q?k=20korrigiert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Assertion, dass nach einem wiederholten Versuch, den Logik-Fehler zu konsumieren nicht mehr Nachrichten konsumiert wurden, als für den Test generiert wurden ist nicht gültig, da bei einem Logik-Fehler ja gerade _kein_ Commit der zuletzt gelesenen Nachrichten erfolgt, da dies dazu führt, dass der Offset für Partitionen erhöht wird, für die vor dem Eintreten des Fehlers noch nicht alle Nachrichten gelesen wurden, wenn nicht explizti Seek's für diese Partitionen durchgeführt werden. * Die Assertion, dass die Offset-Position nach einem Fehler der Offset- Position _vor_ der Ausführung der Fachlogik entspricht ist falsch, da durchaus Commits durchgeführt werden können, bevor der Fehler auftritt. Daher wird jetzt explizit geprüft, dass ** Die Offset-Position für keine Partition größer ist, als der Offset der dort zuletzt gesehenen Nachricht. ** UND mindestens eine Partition existiert, deren Offset _kleiner_ ist, als der Offset der zuletzt gesehenen Nachricht. --- .../juplo/kafka/GenericApplicationTests.java | 47 ++++++++++++------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index b0abf37..649cdba 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -59,7 +59,7 @@ abstract class GenericApplicationTests KafkaConsumer offsetConsumer; EndlessConsumer endlessConsumer; Map oldOffsets; - Map newOffsets; + Map seenOffsets; Set> receivedRecords; @@ -92,7 +92,7 @@ abstract class GenericApplicationTests .untilAsserted(() -> { checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); }); assertThatExceptionOfType(IllegalStateException.class) @@ -115,7 +115,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -124,7 +124,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(newOffsets); + assertSeenOffsetsEqualCommittedOffsets(seenOffsets); assertThat(receivedRecords.size()) .describedAs("Received not all sent events") .isLessThan(numberOfGeneratedMessages); @@ -152,7 +152,7 @@ abstract class GenericApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); endlessConsumer.start(); await("Consumer failed") @@ -160,11 +160,7 @@ abstract class GenericApplicationTests .pollInterval(Duration.ofSeconds(1)) .until(() -> !endlessConsumer.running()); - checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); + assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") @@ -179,18 +175,37 @@ abstract class GenericApplicationTests /** Helper methods for the verification of expectations */ - void compareToCommitedOffsets(Map offsetsToCheck) + void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) { doForCurrentOffsets((tp, offset) -> { Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset for {} is {}", tp, expected); + log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); assertThat(offset) .describedAs("Committed offset corresponds to the offset of the consumer") .isEqualTo(expected); }); } + void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) + { + List isOffsetBehindSeen = new LinkedList<>(); + + doForCurrentOffsets((tp, offset) -> + { + Long expected = offsetsToCheck.get(tp) + 1; + log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); + assertThat(offset) + .describedAs("Committed offset corresponds to the offset of the consumer") + .isLessThanOrEqualTo(expected); + isOffsetBehindSeen.add(offset < expected); + }); + + assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) + .describedAs("Committed offsets are behind seen offsets") + .isTrue(); + } + void checkSeenOffsetsForProgress() { // Be sure, that some messages were consumed...! @@ -198,7 +213,7 @@ abstract class GenericApplicationTests partitions().forEach(tp -> { Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = newOffsets.get(tp) + 1; + Long newOffset = seenOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -315,19 +330,19 @@ abstract class GenericApplicationTests seekToEnd(); oldOffsets = new HashMap<>(); - newOffsets = new HashMap<>(); + seenOffsets = new HashMap<>(); receivedRecords = new HashSet<>(); doForCurrentOffsets((tp, offset) -> { oldOffsets.put(tp, offset - 1); - newOffsets.put(tp, offset - 1); + seenOffsets.put(tp, offset - 1); }); Consumer> captureOffsetAndExecuteTestHandler = record -> { - newOffsets.put( + seenOffsets.put( new TopicPartition(record.topic(), record.partition()), record.offset()); receivedRecords.add(record); -- 2.20.1 From ccc0bbf53e09e7154e96958957e78f94a9d11ead Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Sep 2022 11:46:25 +0200 Subject: [PATCH 16/16] Backport von Verbesserungen / Erweiterungen der Tests: MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Integration-Test `ApplicationIT`, der prüft, ob die Spring-Boot Anwendung ohne Fehler startet und dies über den Endpoint auch meldet. * Inzwischen hinzugefügte `.editorconfig` übernommen. * Fachspezifisches Interface `RecordHandler` statt `java.util.Consumer`. * Kleinere Korrekturen / Verbesserungen an `GenericApplicationTests` übernommen. --- .editorconfig | 13 ++++++ pom.xml | 3 ++ .../juplo/kafka/ApplicationConfiguration.java | 7 ++- .../java/de/juplo/kafka/EndlessConsumer.java | 7 ++- .../java/de/juplo/kafka/RecordHandler.java | 10 +++++ .../java/de/juplo/kafka/ApplicationIT.java | 42 ++++++++++++++++++ .../java/de/juplo/kafka/ApplicationTests.java | 6 +-- .../juplo/kafka/GenericApplicationTests.java | 43 +++++++++++-------- .../de/juplo/kafka/TestRecordHandler.java | 22 ++++++++++ 9 files changed, 125 insertions(+), 28 deletions(-) create mode 100644 .editorconfig create mode 100644 src/main/java/de/juplo/kafka/RecordHandler.java create mode 100644 src/test/java/de/juplo/kafka/ApplicationIT.java create mode 100644 src/test/java/de/juplo/kafka/TestRecordHandler.java diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..633c98a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,13 @@ +root = true + +[*] +indent_style = space +indent_size = tab +tab_width = 2 +charset = utf-8 +end_of_line = lf +trim_trailing_whitespace = true +insert_final_newline = false + +[*.properties] +charset = latin1 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6fd5d5f..e664a07 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,9 @@ + + maven-failsafe-plugin + diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 766740b..6bde5ff 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -11,7 +11,6 @@ import org.springframework.context.annotation.Configuration; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; @Configuration @@ -19,7 +18,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public RecordHandler recordHandler() { return (record) -> { @@ -31,7 +30,7 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + RecordHandler recordHandler, ApplicationProperties properties) { return @@ -40,7 +39,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - handler); + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 8802df9..788a4a7 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,7 +2,10 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; @@ -25,7 +28,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private final String id; private final String topic; private final Consumer consumer; - private final java.util.function.Consumer> handler; + private final RecordHandler handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 0000000..327ac9f --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,10 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.function.Consumer; + + +public interface RecordHandler extends Consumer> +{ +} diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java new file mode 100644 index 0000000..67b9d75 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -0,0 +1,42 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static de.juplo.kafka.ApplicationIT.TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC }) +@EmbeddedKafka(topics = TOPIC) +@AutoConfigureDataMongo +public class ApplicationIT +{ + public static final String TOPIC = "FOO"; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + + + @Test + public void testApplicationStartup() + { + restTemplate.getForObject( + "http://localhost:" + port + "/actuator/health", + String.class + ) + .contains("UP"); + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 1e73040..1781b1d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,16 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.test.context.ContextConfiguration; -import java.util.Set; import java.util.function.Consumer; @@ -73,9 +70,8 @@ public class ApplicationTests extends GenericApplicationTests @TestConfiguration public static class Configuration { - @Primary @Bean - public Consumer> consumer() + public RecordHandler recordHandler() { return (record) -> { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 649cdba..9465ce6 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -47,13 +48,17 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; + ApplicationProperties applicationProperties; @Autowired ExecutorService executor; + @Autowired + ConsumerRebalanceListener rebalanceListener; + @Autowired + RecordHandler recordHandler; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -76,7 +81,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -99,6 +104,7 @@ abstract class GenericApplicationTests .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -196,7 +202,7 @@ abstract class GenericApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset must be at most equal to the offset of the consumer") .isLessThanOrEqualTo(expected); isOffsetBehindSeen.add(offset < expected); }); @@ -313,16 +319,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", applicationProperties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -339,21 +345,25 @@ abstract class GenericApplicationTests seenOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(recordHandler) { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - consumer.accept(record); + @Override + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + consumer.accept(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + applicationProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, captureOffsetAndExecuteTestHandler); @@ -365,7 +375,6 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); testRecordProducer.close(); offsetConsumer.close(); } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java new file mode 100644 index 0000000..b4efdd6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -0,0 +1,22 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRecord; + + +@RequiredArgsConstructor +public abstract class TestRecordHandler implements RecordHandler +{ + private final RecordHandler handler; + + + public abstract void onNewRecord(ConsumerRecord record); + + + @Override + public void accept(ConsumerRecord record) + { + this.onNewRecord(record); + handler.accept(record); + } +} -- 2.20.1