import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.serialization.*;
import org.apache.kafka.common.utils.Bytes;
import org.junit.jupiter.api.*;
@TestPropertySource(
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
"sumup.adder.topic=" + TOPIC,
"spring.kafka.consumer.auto-commit-interval=500ms",
"spring.mongodb.embedded.version=4.4.13" })
{
recordGenerator.generate(true, false, messageSender);
- int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+ int numberOfValidMessages =
+ recordGenerator.getNumberOfMessages() -
+ recordGenerator.getNumberOfPoisonPills();
- await("Consumer failed")
+ await(numberOfValidMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
- endlessConsumer.start();
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
- assertThat(recordHandler.receivedMessages)
- .describedAs("Received not all sent events")
- .isLessThan(numberOfGeneratedMessages);
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ });
assertThat(endlessConsumer.running())
- .describedAs("Consumer should have exited")
- .isFalse();
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ endlessConsumer.stop();
recordGenerator.assertBusinessLogic();
}
{
recordGenerator.generate(false, true, messageSender);
- int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
+ int numberOfValidMessages =
+ recordGenerator.getNumberOfMessages() -
+ recordGenerator.getNumberOfLogicErrors();
- await("Consumer failed")
+ await(numberOfValidMessages + " records received")
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
- endlessConsumer.start();
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
+ });
assertThat(endlessConsumer.running())
- .describedAs("Consumer should not be running")
- .isFalse();
+ .describedAs("Consumer should still be running")
+ .isTrue();
+ endlessConsumer.stop();
recordGenerator.assertBusinessLogic();
}
{
return factory.createConsumer();
}
+
+ @Bean
+ public DeadLetterTopicConsumer deadLetterTopicConsumer()
+ {
+ return new DeadLetterTopicConsumer();
+ }
}
}