From: Kai Moritz Date: Fri, 9 Sep 2022 09:46:25 +0000 (+0200) Subject: Backport von Verbesserungen / Erweiterungen der Tests: X-Git-Tag: wip-merge-deserialization--sumup-adder--ohne-stored-offsets^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ccc0bbf53e09e7154e96958957e78f94a9d11ead;p=demos%2Fkafka%2Ftraining Backport von Verbesserungen / Erweiterungen der Tests: * Integration-Test `ApplicationIT`, der prüft, ob die Spring-Boot Anwendung ohne Fehler startet und dies über den Endpoint auch meldet. * Inzwischen hinzugefügte `.editorconfig` übernommen. * Fachspezifisches Interface `RecordHandler` statt `java.util.Consumer`. * Kleinere Korrekturen / Verbesserungen an `GenericApplicationTests` übernommen. --- diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..633c98a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,13 @@ +root = true + +[*] +indent_style = space +indent_size = tab +tab_width = 2 +charset = utf-8 +end_of_line = lf +trim_trailing_whitespace = true +insert_final_newline = false + +[*.properties] +charset = latin1 \ No newline at end of file diff --git a/pom.xml b/pom.xml index 6fd5d5f..e664a07 100644 --- a/pom.xml +++ b/pom.xml @@ -101,6 +101,9 @@ + + maven-failsafe-plugin + diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 766740b..6bde5ff 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -11,7 +11,6 @@ import org.springframework.context.annotation.Configuration; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Consumer; @Configuration @@ -19,7 +18,7 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() + public RecordHandler recordHandler() { return (record) -> { @@ -31,7 +30,7 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, + RecordHandler recordHandler, ApplicationProperties properties) { return @@ -40,7 +39,7 @@ public class ApplicationConfiguration properties.getClientId(), properties.getTopic(), kafkaConsumer, - handler); + recordHandler); } @Bean diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 8802df9..788a4a7 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,7 +2,10 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; @@ -25,7 +28,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private final String id; private final String topic; private final Consumer consumer; - private final java.util.function.Consumer> handler; + private final RecordHandler handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 0000000..327ac9f --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,10 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.function.Consumer; + + +public interface RecordHandler extends Consumer> +{ +} diff --git a/src/test/java/de/juplo/kafka/ApplicationIT.java b/src/test/java/de/juplo/kafka/ApplicationIT.java new file mode 100644 index 0000000..67b9d75 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationIT.java @@ -0,0 +1,42 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.boot.test.web.server.LocalServerPort; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import static de.juplo.kafka.ApplicationIT.TOPIC; + + +@SpringBootTest( + webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, + properties = { + "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", + "consumer.topic=" + TOPIC }) +@EmbeddedKafka(topics = TOPIC) +@AutoConfigureDataMongo +public class ApplicationIT +{ + public static final String TOPIC = "FOO"; + + @LocalServerPort + private int port; + + @Autowired + private TestRestTemplate restTemplate; + + + + @Test + public void testApplicationStartup() + { + restTemplate.getForObject( + "http://localhost:" + port + "/actuator/health", + String.class + ) + .contains("UP"); + } +} diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 1e73040..1781b1d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,16 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.test.context.ContextConfiguration; -import java.util.Set; import java.util.function.Consumer; @@ -73,9 +70,8 @@ public class ApplicationTests extends GenericApplicationTests @TestConfiguration public static class Configuration { - @Primary @Bean - public Consumer> consumer() + public RecordHandler recordHandler() { return (record) -> { diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 649cdba..9465ce6 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -47,13 +48,17 @@ abstract class GenericApplicationTests @Autowired - KafkaConsumer kafkaConsumer; + org.apache.kafka.clients.consumer.Consumer kafkaConsumer; @Autowired Consumer> consumer; @Autowired - ApplicationProperties properties; + ApplicationProperties applicationProperties; @Autowired ExecutorService executor; + @Autowired + ConsumerRebalanceListener rebalanceListener; + @Autowired + RecordHandler recordHandler; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -76,7 +81,7 @@ abstract class GenericApplicationTests /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() + void commitsCurrentOffsetsOnSuccess() throws Exception { int numberOfGeneratedMessages = recordGenerator.generate(false, false, messageSender); @@ -99,6 +104,7 @@ abstract class GenericApplicationTests .isThrownBy(() -> endlessConsumer.exitStatus()) .describedAs("Consumer should still be running"); + endlessConsumer.stop(); recordGenerator.assertBusinessLogic(); } @@ -196,7 +202,7 @@ abstract class GenericApplicationTests Long expected = offsetsToCheck.get(tp) + 1; log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") + .describedAs("Committed offset must be at most equal to the offset of the consumer") .isLessThanOrEqualTo(expected); isOffsetBehindSeen.add(offset < expected); }); @@ -313,16 +319,16 @@ abstract class GenericApplicationTests { Properties props; props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("linger.ms", 100); props.put("key.serializer", BytesSerializer.class.getName()); props.put("value.serializer", BytesSerializer.class.getName()); testRecordProducer = new KafkaProducer<>(props); props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("bootstrap.servers", applicationProperties.getBootstrapServer()); props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); + props.put("group.id", applicationProperties.getGroupId()); props.put("key.deserializer", BytesDeserializer.class.getName()); props.put("value.deserializer", BytesDeserializer.class.getName()); offsetConsumer = new KafkaConsumer<>(props); @@ -339,21 +345,25 @@ abstract class GenericApplicationTests seenOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(recordHandler) { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - consumer.accept(record); + @Override + public void onNewRecord(ConsumerRecord record) + { + seenOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + consumer.accept(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - properties.getClientId(), - properties.getTopic(), + applicationProperties.getClientId(), + applicationProperties.getTopic(), kafkaConsumer, captureOffsetAndExecuteTestHandler); @@ -365,7 +375,6 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); testRecordProducer.close(); offsetConsumer.close(); } diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java new file mode 100644 index 0000000..b4efdd6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/TestRecordHandler.java @@ -0,0 +1,22 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRecord; + + +@RequiredArgsConstructor +public abstract class TestRecordHandler implements RecordHandler +{ + private final RecordHandler handler; + + + public abstract void onNewRecord(ConsumerRecord record); + + + @Override + public void accept(ConsumerRecord record) + { + this.onNewRecord(record); + handler.accept(record); + } +}