From eba11d4859d1e2b936bcd9e8075986b5179b32ea Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 18 Sep 2022 06:31:25 +0200 Subject: [PATCH] =?utf8?q?Addder=20beendet=20sich=20bei=20Fehler=20und=20L?= =?utf8?q?ogik=20f=C3=BCr=20Beenden=20vereinfacht?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Adder beendet sich bei einer Exception vollständig. * _Hintergrund:_ Zustand ist in der Übung leichter erkennbar. * Die Logik, ob beim Beenden ein Commit durchgeführt wurde, wurde so weit wie möglich vereinfacht. * Da der Test aus zeitmangel nicht angepasst werden konnte, wurde er vorerst entfernt. --- docker-compose.yml | 7 +- .../juplo/kafka/ApplicationConfiguration.java | 3 + .../java/de/juplo/kafka/DriverController.java | 15 - .../java/de/juplo/kafka/EndlessConsumer.java | 131 +----- .../java/de/juplo/kafka/ApplicationTests.java | 172 -------- .../ErrorCannotBeGeneratedCondition.java | 60 --- .../juplo/kafka/GenericApplicationTests.java | 405 ------------------ .../kafka/SkipWhenErrorCannotBeGenerated.java | 15 - .../de/juplo/kafka/TestRecordHandler.java | 22 - 9 files changed, 30 insertions(+), 800 deletions(-) delete mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java delete mode 100644 src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java delete mode 100644 src/test/java/de/juplo/kafka/GenericApplicationTests.java delete mode 100644 src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java delete mode 100644 src/test/java/de/juplo/kafka/TestRecordHandler.java diff --git a/docker-compose.yml b/docker-compose.yml index 5d33cd1..3f7188c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -115,13 +115,14 @@ services: sumup.requests.client-id: requests-1 requests-2: - image: juplo/sumup-requests-json:1.0-SNAPSHOT + image: juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT ports: - 8082:8080 environment: server.port: 8080 sumup.requests.bootstrap-server: kafka:9092 sumup.requests.client-id: requests-2 + sumup.requests.error-position: 6 adder-1: image: juplo/sumup-adder-json:1.0-SNAPSHOT @@ -135,7 +136,7 @@ services: sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG + logging.level.org.apache.kafka.clients.consumer: INFO adder-2: image: juplo/sumup-adder-json:1.0-SNAPSHOT @@ -149,7 +150,7 @@ services: sumup.adder.throttle: 3ms spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 spring.data.mongodb.database: juplo - logging.level.org.apache.kafka.clients.consumer: DEBUG + logging.level.org.apache.kafka.clients.consumer: INFO peter: image: juplo/toolbox diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 6137411..93db3b5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; @@ -53,6 +54,7 @@ public class ApplicationConfiguration public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, + ConfigurableApplicationContext applicationContext, ApplicationRebalanceListener rebalanceListener, ApplicationRecordHandler recordHandler, ApplicationProperties properties) @@ -60,6 +62,7 @@ public class ApplicationConfiguration return new EndlessConsumer<>( executor, + applicationContext, properties.getClientId(), properties.getTopic(), kafkaConsumer, diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 26a5bc8..cc3abed 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -8,7 +8,6 @@ import org.springframework.web.bind.annotation.*; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -16,24 +15,10 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class DriverController { - private final EndlessConsumer consumer; private final ApplicationRecordHandler recordHandler; private final AdderResults results; - @PostMapping("start") - public void start() - { - consumer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - consumer.stop(); - } - - @GetMapping("state") public Map> state() { diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..9ea944b 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -6,15 +6,11 @@ import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; +import org.springframework.context.ConfigurableApplicationContext; -import javax.annotation.PreDestroy; import java.time.Duration; import java.util.*; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; @Slf4j @@ -22,16 +18,15 @@ import java.util.concurrent.locks.ReentrantLock; public class EndlessConsumer implements Runnable { private final ExecutorService executor; + private final ConfigurableApplicationContext applicationContext; private final String id; private final String topic; private final Consumer consumer; private final ConsumerRebalanceListener rebalanceListener; private final RecordHandler recordHandler; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); private boolean running = false; - private Exception exception; + private Exception exception = null; private long consumed = 0; @@ -72,8 +67,6 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); } catch(RecordDeserializationException e) { @@ -85,128 +78,50 @@ public class EndlessConsumer implements Runnable tp, offset, e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); + this.exception = e; } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); + this.exception = e; + log.info("{} - Unsubscribing...", id); + consumer.unsubscribe(); } finally { + running = false; + log.info("{} - Closing the consumer...", id); + consumer.close(); + log.info("{} - Shutting down the app...", id); + applicationContext.close(); log.info("{} - Consumer-Thread exiting", id); } } - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } - } - public void start() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } - - public synchronized void stop() throws InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + executor.submit(this); } - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException + public void stop() { - log.info("{} - Destroy!", id); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + consumer.wakeup(); } - public boolean running() { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } + return running; } public Optional exitStatus() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } + return Optional.ofNullable(exception); } } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java deleted file mode 100644 index bd9f449..0000000 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ /dev/null @@ -1,172 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.utils.Bytes; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.*; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static org.assertj.core.api.Assertions.assertThat; - - -@Slf4j -public class ApplicationTests extends GenericApplicationTests -{ - @Autowired - StateRepository stateRepository; - - - public ApplicationTests() - { - super(new ApplicationTestRecrodGenerator()); - ((ApplicationTestRecrodGenerator)recordGenerator).tests = this; - } - - - static class ApplicationTestRecrodGenerator implements RecordGenerator - { - ApplicationTests tests; - - final int[] numbers = {1, 77, 33, 2, 66, 666, 11}; - final String[] dieWilden13 = - IntStream - .range(1, 14) - .mapToObj(i -> "seeräuber-" + i) - .toArray(i -> new String[i]); - final StringSerializer stringSerializer = new StringSerializer(); - final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}")); - - int counter = 0; - - Map> state; - - @Override - public int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender) - { - counter = 0; - state = - Arrays - .stream(dieWilden13) - .collect(Collectors.toMap( - seeräuber -> seeräuber, - seeräuber -> new LinkedList())); - - int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 }; - int next = 0; - - for (int pass = 0; pass < 333; pass++) - { - for (int i = 0; i<13; i++) - { - String seeräuber = dieWilden13[i]; - Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber)); - - if (message[i] > number[i]) - { - send( - key, - calculateMessage, - Message.Type.CALC, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), - messageSender); - state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2)); - // Pick next number to calculate - number[i] = numbers[next++%numbers.length]; - message[i] = 1; - log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]); - } - - send( - key, - new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")), - Message.Type.ADD, - poisonPill(poisonPills, pass, counter), - logicError(logicErrors, pass, counter), - messageSender); - } - } - - return counter; - } - - boolean poisonPill (boolean poisonPills, int pass, int counter) - { - return poisonPills && pass > 300 && counter%99 == 0; - } - - boolean logicError(boolean logicErrors, int pass, int counter) - { - return logicErrors && pass > 300 && counter%77 == 0; - } - - void send( - Bytes key, - Bytes value, - Message.Type type, - boolean poisonPill, - boolean logicError, - Consumer> messageSender) - { - counter++; - - if (logicError) - { - value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}")); - } - if (poisonPill) - { - value = new Bytes("BOOM!".getBytes()); - } - - ProducerRecord record = new ProducerRecord<>(TOPIC, key, value); - record.headers().add("__TypeId__", type.toString().getBytes()); - messageSender.accept(record); - } - - @Override - public void assertBusinessLogic() - { - for (int i=0; i - { - String user = entry.getKey(); - List resultsForUser = entry.getValue(); - - for (int j=0; j < resultsForUser.size(); j++) - { - if (!(j < state.get(user).size())) - { - break; - } - - assertThat(resultsForUser.get(j)) - .as("Unexpected results calculation %d of user %s", j, user) - .isEqualTo(state.get(user).get(j)); - } - - assertThat(state.get(user)) - .as("More results calculated for user %s as expected", user) - .containsAll(resultsForUser); - }); - } - } - } -} diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java deleted file mode 100644 index 606218f..0000000 --- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java +++ /dev/null @@ -1,60 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ConditionEvaluationResult; -import org.junit.jupiter.api.extension.ExecutionCondition; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.platform.commons.util.AnnotationUtils; - -import java.util.LinkedList; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; - - -public class ErrorCannotBeGeneratedCondition implements ExecutionCondition -{ - @Override - public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) - { - final Optional optional = - AnnotationUtils.findAnnotation( - context.getElement(), - SkipWhenErrorCannotBeGenerated.class); - - if (context.getTestInstance().isEmpty()) - return ConditionEvaluationResult.enabled("Test-instance ist not available"); - - if (optional.isPresent()) - { - SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get(); - GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get(); - List missingRequiredErrors = new LinkedList<>(); - - if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill()) - missingRequiredErrors.add("Poison-Pill"); - - if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError()) - missingRequiredErrors.add("Logic-Error"); - - StringBuilder builder = new StringBuilder(); - builder.append(context.getTestClass().get().getSimpleName()); - - if (missingRequiredErrors.isEmpty()) - { - builder.append(" can generate all required types of errors"); - return ConditionEvaluationResult.enabled(builder.toString()); - } - - builder.append(" cannot generate the required error(s): "); - builder.append( - missingRequiredErrors - .stream() - .collect(Collectors.joining(", "))); - - return ConditionEvaluationResult.disabled(builder.toString()); - } - - return ConditionEvaluationResult.enabled( - "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName()); - } -} diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java deleted file mode 100644 index 8849317..0000000 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ /dev/null @@ -1,405 +0,0 @@ -package de.juplo.kafka; - -import com.mongodb.client.MongoClient; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.common.utils.Bytes; -import org.junit.jupiter.api.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.mongo.MongoProperties; -import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; -import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Import; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.TestPropertySource; -import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; - -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.function.BiConsumer; -import java.util.function.Consumer; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static de.juplo.kafka.GenericApplicationTests.PARTITIONS; -import static de.juplo.kafka.GenericApplicationTests.TOPIC; -import static org.assertj.core.api.Assertions.*; -import static org.awaitility.Awaitility.*; - - -@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) -@TestPropertySource( - properties = { - "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}", - "sumup.adder.topic=" + TOPIC, - "sumup.adder.commit-interval=500ms", - "spring.mongodb.embedded.version=4.4.13" }) -@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) -@EnableAutoConfiguration -@AutoConfigureDataMongo -@Slf4j -abstract class GenericApplicationTests -{ - public static final String TOPIC = "FOO"; - public static final int PARTITIONS = 10; - - - @Autowired - KafkaConsumer kafkaConsumer; - @Autowired - Consumer> consumer; - @Autowired - ApplicationProperties properties; - @Autowired - ExecutorService executor; - @Autowired - MongoClient mongoClient; - @Autowired - MongoProperties mongoProperties; - @Autowired - ConsumerRebalanceListener rebalanceListener; - @Autowired - RecordHandler recordHandler; - - KafkaProducer testRecordProducer; - KafkaConsumer offsetConsumer; - EndlessConsumer endlessConsumer; - Map oldOffsets; - Map seenOffsets; - Set> receivedRecords; - - - final RecordGenerator recordGenerator; - final Consumer> messageSender; - - public GenericApplicationTests(RecordGenerator recordGenerator) - { - this.recordGenerator = recordGenerator; - this.messageSender = (record) -> sendMessage(record); - } - - - /** Tests methods */ - - @Test - void commitsCurrentOffsetsOnSuccess() throws Exception - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, false, messageSender); - - await(numberOfGeneratedMessages + " records received") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> receivedRecords.size() >= numberOfGeneratedMessages); - - await("Offsets committed") - .atMost(Duration.ofSeconds(10)) - .pollInterval(Duration.ofSeconds(1)) - .untilAsserted(() -> - { - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - }); - - assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) - .describedAs("Consumer should still be running"); - - endlessConsumer.stop(); - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(poisonPill = true) - void commitsOffsetOfErrorForReprocessingOnDeserializationError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(true, false, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsEqualCommittedOffsets(seenOffsets); - assertThat(receivedRecords.size()) - .describedAs("Received not all sent events") - .isLessThan(numberOfGeneratedMessages); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RecordDeserializationException.class); - - recordGenerator.assertBusinessLogic(); - } - - @Test - @SkipWhenErrorCannotBeGenerated(logicError = true) - void doesNotCommitOffsetsOnLogicError() - { - int numberOfGeneratedMessages = - recordGenerator.generate(false, true, messageSender); - - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - checkSeenOffsetsForProgress(); - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); - - endlessConsumer.start(); - await("Consumer failed") - .atMost(Duration.ofSeconds(30)) - .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); - - assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets); - - assertThatNoException() - .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) - .describedAs("Consumer should have exited abnormally") - .containsInstanceOf(RuntimeException.class); - - recordGenerator.assertBusinessLogic(); - } - - - /** Helper methods for the verification of expectations */ - - void assertSeenOffsetsEqualCommittedOffsets(Map offsetsToCheck) - { - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset corresponds to the offset of the consumer") - .isEqualTo(expected); - }); - } - - void assertSeenOffsetsAreBehindCommittedOffsets(Map offsetsToCheck) - { - List isOffsetBehindSeen = new LinkedList<>(); - - doForCurrentOffsets((tp, offset) -> - { - Long expected = offsetsToCheck.get(tp) + 1; - log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected); - assertThat(offset) - .describedAs("Committed offset must be at most equal to the offset of the consumer") - .isLessThanOrEqualTo(expected); - isOffsetBehindSeen.add(offset < expected); - }); - - assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next)) - .describedAs("Committed offsets are behind seen offsets") - .isTrue(); - } - - void checkSeenOffsetsForProgress() - { - // Be sure, that some messages were consumed...! - Set withProgress = new HashSet<>(); - partitions().forEach(tp -> - { - Long oldOffset = oldOffsets.get(tp) + 1; - Long newOffset = seenOffsets.get(tp) + 1; - if (!oldOffset.equals(newOffset)) - { - log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset); - withProgress.add(tp); - } - }); - assertThat(withProgress) - .describedAs("Some offsets must have changed, compared to the old offset-positions") - .isNotEmpty(); - } - - - /** Helper methods for setting up and running the tests */ - - void seekToEnd() - { - offsetConsumer.assign(partitions()); - offsetConsumer.seekToEnd(partitions()); - partitions().forEach(tp -> - { - // seekToEnd() works lazily: it only takes effect on poll()/position() - Long offset = offsetConsumer.position(tp); - log.info("New position for {}: {}", tp, offset); - }); - // The new positions must be commited! - offsetConsumer.commitSync(); - offsetConsumer.unsubscribe(); - } - - void doForCurrentOffsets(BiConsumer consumer) - { - offsetConsumer.assign(partitions()); - partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp))); - offsetConsumer.unsubscribe(); - } - - List partitions() - { - return - IntStream - .range(0, PARTITIONS) - .mapToObj(partition -> new TopicPartition(TOPIC, partition)) - .collect(Collectors.toList()); - } - - - public interface RecordGenerator - { - int generate( - boolean poisonPills, - boolean logicErrors, - Consumer> messageSender); - - default boolean canGeneratePoisonPill() - { - return true; - } - - default boolean canGenerateLogicError() - { - return true; - } - - default void assertBusinessLogic() - { - log.debug("No business-logic to assert"); - } - } - - void sendMessage(ProducerRecord record) - { - testRecordProducer.send(record, (metadata, e) -> - { - if (metadata != null) - { - log.debug( - "{}|{} - {}={}", - metadata.partition(), - metadata.offset(), - record.key(), - record.value()); - } - else - { - log.warn( - "Exception for {}={}: {}", - record.key(), - record.value(), - e.toString()); - } - }); - } - - - @BeforeEach - public void init() - { - Properties props; - props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("linger.ms", 100); - props.put("key.serializer", BytesSerializer.class.getName()); - props.put("value.serializer", BytesSerializer.class.getName()); - testRecordProducer = new KafkaProducer<>(props); - - props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", "OFFSET-CONSUMER"); - props.put("group.id", properties.getGroupId()); - props.put("key.deserializer", BytesDeserializer.class.getName()); - props.put("value.deserializer", BytesDeserializer.class.getName()); - offsetConsumer = new KafkaConsumer<>(props); - - mongoClient.getDatabase(mongoProperties.getDatabase()).drop(); - seekToEnd(); - - oldOffsets = new HashMap<>(); - seenOffsets = new HashMap<>(); - receivedRecords = new HashSet<>(); - - doForCurrentOffsets((tp, offset) -> - { - oldOffsets.put(tp, offset - 1); - seenOffsets.put(tp, offset - 1); - }); - - TestRecordHandler captureOffsetAndExecuteTestHandler = - new TestRecordHandler(recordHandler) - { - @Override - public void onNewRecord(ConsumerRecord record) - { - seenOffsets.put( - new TopicPartition(record.topic(), record.partition()), - record.offset()); - receivedRecords.add(record); - } - }; - - endlessConsumer = - new EndlessConsumer<>( - executor, - properties.getClientId(), - properties.getTopic(), - kafkaConsumer, - rebalanceListener, - captureOffsetAndExecuteTestHandler); - - endlessConsumer.start(); - } - - @AfterEach - public void deinit() - { - try - { - testRecordProducer.close(); - offsetConsumer.close(); - } - catch (Exception e) - { - log.info("Exception while stopping the consumer: {}", e.toString()); - } - } - - - @TestConfiguration - @Import(ApplicationConfiguration.class) - public static class Configuration - { - } -} diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java deleted file mode 100644 index 6d15e9e..0000000 --- a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka; - -import org.junit.jupiter.api.extension.ExtendWith; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - - -@Retention(RetentionPolicy.RUNTIME) -@ExtendWith(ErrorCannotBeGeneratedCondition.class) -public @interface SkipWhenErrorCannotBeGenerated -{ - boolean poisonPill() default false; - boolean logicError() default false; -} diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java deleted file mode 100644 index b4efdd6..0000000 --- a/src/test/java/de/juplo/kafka/TestRecordHandler.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.clients.consumer.ConsumerRecord; - - -@RequiredArgsConstructor -public abstract class TestRecordHandler implements RecordHandler -{ - private final RecordHandler handler; - - - public abstract void onNewRecord(ConsumerRecord record); - - - @Override - public void accept(ConsumerRecord record) - { - this.onNewRecord(record); - handler.accept(record); - } -} -- 2.20.1