From a126175808eeec80355deb8eb3f4ef7e85e84780 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 14 Aug 2022 09:54:45 +0200 Subject: [PATCH] =?utf8?q?Typisierbare=20Basis-Klasse=20`GenericApplicatio?= =?utf8?q?nTests`=20eingef=C3=BChrt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- pom.xml | 4 + ...Tests.java => GenericApplicationTest.java} | 165 ++++++++---------- 2 files changed, 73 insertions(+), 96 deletions(-) rename src/test/java/de/juplo/kafka/{ApplicationTests.java => GenericApplicationTest.java} (67%) diff --git a/pom.xml b/pom.xml index 1f5caab..6fd5d5f 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,10 @@ 1.0-SNAPSHOT Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic + + 11 + + org.springframework.boot diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTest.java similarity index 67% rename from src/test/java/de/juplo/kafka/ApplicationTests.java rename to src/test/java/de/juplo/kafka/GenericApplicationTest.java index 3bac537..6b2b635 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTest.java @@ -13,7 +13,6 @@ import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; @@ -21,21 +20,19 @@ import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.IntStream; -import static de.juplo.kafka.ApplicationTests.PARTITIONS; -import static de.juplo.kafka.ApplicationTests.TOPIC; +import static de.juplo.kafka.GenericApplicationTest.PARTITIONS; +import static de.juplo.kafka.GenericApplicationTest.TOPIC; import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) -@TestMethodOrder(MethodOrderer.OrderAnnotation.class) @TestPropertySource( properties = { "consumer.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -43,44 +40,48 @@ import static org.awaitility.Awaitility.*; "consumer.commit-interval=1s" }) @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j -class ApplicationTests +abstract class GenericApplicationTest { public static final String TOPIC = "FOO"; public static final int PARTITIONS = 10; - StringSerializer stringSerializer = new StringSerializer(); - - @Autowired - Serializer valueSerializer; - @Autowired - KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; - @Autowired - KafkaConsumer offsetConsumer; + KafkaConsumer kafkaConsumer; @Autowired ApplicationProperties properties; @Autowired ExecutorService executor; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + KafkaProducer testRecordProducer; + KafkaConsumer offsetConsumer; + Consumer> testHandler; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; + + + final Serializer keySerializer; + final RecordGenerator recordGenerator; + final Consumer> messageSender; + + public GenericApplicationTest( + Serializer keySerializer, + RecordGenerator recordGenerator) + { + this.keySerializer = keySerializer; + this.recordGenerator = recordGenerator; + this.messageSender = (record) -> sendMessage(record); + } /** Tests methods */ @Test - void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException + void commitsCurrentOffsetsOnSuccess() { - send100Messages((partition, key, counter) -> - { - Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); - }); + recordGenerator.generate(100, Set.of(), messageSender); await("100 records received") .atMost(Duration.ofSeconds(30)) @@ -104,13 +105,7 @@ class ApplicationTests @Test void commitsOffsetOfErrorForReprocessingOnDeserializationError() { - send100Messages((partition, key, counter) -> - { - Bytes value = counter == 77 - ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) - : new Bytes(valueSerializer.serialize(TOPIC, counter)); - return new ProducerRecord<>(TOPIC, partition, key, value); - }); + recordGenerator.generate(100, Set.of(77), messageSender); await("Consumer failed") .atMost(Duration.ofSeconds(30)) @@ -209,50 +204,58 @@ class ApplicationTests } - public interface RecordGenerator + public interface RecordGenerator { - public ProducerRecord generate(int partition, String key, long counter); + void generate( + int numberOfMessagesToGenerate, + Set poistionPills, + Consumer> messageSender); } - void send100Messages(RecordGenerator recordGenerator) + void sendMessage(ProducerRecord record) { - long i = 0; - - for (int partition = 0; partition < 10; partition++) + testRecordProducer.send(record, (metadata, e) -> { - for (int key = 0; key < 10; key++) + if (metadata != null) { - ProducerRecord record = - recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); - - kafkaProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); + log.debug( + "{}|{} - {}={}", + metadata.partition(), + metadata.offset(), + record.key(), + record.value()); } - } + else + { + log.warn( + "Exception for {}={}: {}", + record.key(), + record.value(), + e.toString()); + } + }); } @BeforeEach public void init() { + Properties props; + props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("linger.ms", 100); + props.put("key.serializer", keySerializer.getClass().getName()); + props.put("value.serializer", BytesSerializer.class.getName()); + testRecordProducer = new KafkaProducer<>(props); + + props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", "OFFSET-CONSUMER"); + props.put("group.id", properties.getGroupId()); + props.put("key.deserializer", BytesDeserializer.class.getName()); + props.put("value.deserializer", BytesDeserializer.class.getName()); + offsetConsumer = new KafkaConsumer<>(props); + testHandler = record -> {} ; seekToEnd(); @@ -267,7 +270,7 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = + Consumer> captureOffsetAndExecuteTestHandler = record -> { newOffsets.put( @@ -294,6 +297,8 @@ class ApplicationTests try { endlessConsumer.stop(); + testRecordProducer.close(); + offsetConsumer.close(); } catch (Exception e) { @@ -304,37 +309,5 @@ class ApplicationTests @TestConfiguration @Import(ApplicationConfiguration.class) - public static class Configuration - { - @Bean - Serializer serializer() - { - return new LongSerializer(); - } - - @Bean - KafkaProducer kafkaProducer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("linger.ms", 100); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - - return new KafkaProducer<>(props); - } - - @Bean - KafkaConsumer offsetConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } - } + public static class Configuration {} } -- 2.20.1