X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationTests.java;h=fc5d4c9e51d21348a9271fad7877bbd07b9ed0fe;hb=b4b08b184fbe583b45c85b9b9c5622e4c1ac5de5;hp=ca72e3c552fe10bdd9cea727da1f2dd199449a70;hpb=60bc4a251dc9bab71d5ab5f12870147fec253ac9;p=demos%2Fkafka%2Ftraining diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ca72e3c..fc5d4c9 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.serialization.*; import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; @@ -20,14 +21,11 @@ import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import java.time.Clock; import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -62,7 +60,9 @@ class ApplicationTests @Autowired KafkaProducer kafkaProducer; @Autowired - KafkaConsumer kafkaConsumer; + KafkaConsumer kafkaConsumer; + @Autowired + KafkaConsumer offsetConsumer; @Autowired PartitionStatisticsRepository partitionStatisticsRepository; @Autowired @@ -71,12 +71,15 @@ class ApplicationTests ExecutorService executor; @Autowired PartitionStatisticsRepository repository; + @Autowired + KeyCountingRebalanceListener keyCountingRebalanceListener; + @Autowired + KeyCountingRecordHandler keyCountingRecordHandler; - Consumer> testHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; - Set> receivedRecords; + Set> receivedRecords; /** Tests methods */ @@ -84,14 +87,20 @@ class ApplicationTests @Test void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException { - send100Messages(i -> new Bytes(valueSerializer.serialize(TOPIC, i))); + send100Messages((partition, key, counter) -> + { + Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); await("100 records received") .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) .until(() -> receivedRecords.size() >= 100); await("Offsets committed") .atMost(Duration.ofSeconds(10)) + .pollInterval(Duration.ofSeconds(1)) .untilAsserted(() -> { checkSeenOffsetsForProgress(); @@ -103,6 +112,45 @@ class ApplicationTests .describedAs("Consumer should still be running"); } + @Test + void commitsOffsetOfErrorForReprocessingOnDeserializationError() + { + send100Messages((partition, key, counter) -> + { + Bytes value = counter == 77 + ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!")) + : new Bytes(valueSerializer.serialize(TOPIC, counter)); + return new ProducerRecord<>(TOPIC, partition, key, value); + }); + + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + + endlessConsumer.start(); + await("Consumer failed") + .atMost(Duration.ofSeconds(30)) + .pollInterval(Duration.ofSeconds(1)) + .until(() -> !endlessConsumer.running()); + + checkSeenOffsetsForProgress(); + compareToCommitedOffsets(newOffsets); + assertThat(receivedRecords.size()) + .describedAs("Received not all sent events") + .isLessThan(100); + + assertThatNoException() + .describedAs("Consumer should not be running") + .isThrownBy(() -> endlessConsumer.exitStatus()); + assertThat(endlessConsumer.exitStatus()) + .describedAs("Consumer should have exited abnormally") + .containsInstanceOf(RecordDeserializationException.class); + } + /** Helper methods for the verification of expectations */ @@ -124,8 +172,8 @@ class ApplicationTests Set withProgress = new HashSet<>(); partitions().forEach(tp -> { - Long oldOffset = oldOffsets.get(tp); - Long newOffset = newOffsets.get(tp); + Long oldOffset = oldOffsets.get(tp) + 1; + Long newOffset = newOffsets.get(tp) + 1; if (!oldOffset.equals(newOffset)) { log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); @@ -140,6 +188,24 @@ class ApplicationTests /** Helper methods for setting up and running the tests */ + void seekToEnd() + { + offsetConsumer.assign(partitions()); + partitions().forEach(tp -> + { + Long offset = offsetConsumer.position(tp); + log.info("New position for {}: {}", tp, offset); + Integer partition = tp.partition(); + StatisticsDocument document = + partitionStatisticsRepository + .findById(partition.toString()) + .orElse(new StatisticsDocument(partition)); + document.offset = offset; + partitionStatisticsRepository.save(document); + }); + offsetConsumer.unsubscribe(); + } + void doForCurrentOffsets(BiConsumer consumer) { partitions().forEach(tp -> @@ -148,7 +214,7 @@ class ApplicationTests Optional offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset); consumer.accept(tp, offset.orElse(0l)); }); - } + } List partitions() { @@ -160,7 +226,12 @@ class ApplicationTests } - void send100Messages(Function messageGenerator) + public interface RecordGenerator + { + public ProducerRecord generate(int partition, String key, long counter); + } + + void send100Messages(RecordGenerator recordGenerator) { long i = 0; @@ -168,14 +239,8 @@ class ApplicationTests { for (int key = 0; key < 10; key++) { - Bytes value = messageGenerator.apply(++i); - ProducerRecord record = - new ProducerRecord<>( - TOPIC, - partition, - Integer.toString(key%2), - value); + recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i); kafkaProducer.send(record, (metadata, e) -> { @@ -205,7 +270,7 @@ class ApplicationTests @BeforeEach public void init() { - testHandler = record -> {} ; + seekToEnd(); oldOffsets = new HashMap<>(); newOffsets = new HashMap<>(); @@ -217,25 +282,25 @@ class ApplicationTests newOffsets.put(tp, offset - 1); }); - Consumer> captureOffsetAndExecuteTestHandler = - record -> - { - newOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - testHandler.accept(record); + TestRecordHandler captureOffsetAndExecuteTestHandler = + new TestRecordHandler(keyCountingRecordHandler) { + @Override + public void onNewRecord(ConsumerRecord record) + { + newOffsets.put( + new TopicPartition(record.topic(), record.partition()), + record.offset()); + receivedRecords.add(record); + } }; endlessConsumer = new EndlessConsumer<>( executor, - repository, properties.getClientId(), properties.getTopic(), - Clock.systemDefaultZone(), - properties.getCommitInterval(), kafkaConsumer, + keyCountingRebalanceListener, captureOffsetAndExecuteTestHandler); endlessConsumer.start(); @@ -276,5 +341,19 @@ class ApplicationTests return new KafkaProducer<>(props); } + + @Bean + KafkaConsumer offsetConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", "OFFSET-CONSUMER"); + props.put("enable.auto.commit", false); + props.put("auto.offset.reset", "latest"); + props.put("key.deserializer", BytesDeserializer.class.getName()); + props.put("value.deserializer", BytesDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } } }