--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+
+
+@SpringBootTest(
+ classes = {
+ KafkaAutoConfiguration.class,
+ ExampleConsumerTest.ConsumerRunnableTestConfig.class,
+ },
+ properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "logging.level.de.juplo.kafka=TRACE",
+ "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.consumer.topic=" + TOPIC,
+ "juplo.consumer.auto-offset-reset=earliest",
+ "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+ })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public class ExampleConsumerTest
+{
+ @DisplayName("All valid messages are consumed")
+ @Test
+ void testOnlyValidMessages()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are received")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> mockHandler.getNumRequestsHandled() == 20);
+ }
+
+ @DisplayName("A deserialization exception is skipped")
+ @Test
+ void testDeserializationException()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendNonDeserializableMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All valid messages are received")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> mockHandler.getNumRequestsHandled() == 19);
+ }
+
+ @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+ @Test
+ void testDomainError() throws Exception
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRuntimeExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("The ConsumerRunnable was exited by an unexpected exception")
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(250))
+ .until(() -> isTerminatedExceptionally.get());
+ }
+
+
+ static final String TOPIC = "ExampleConsumerTest_TEST";
+ static final int NUM_PARTITIONS = 10;
+
+ @Autowired
+ KafkaTemplate<String, byte[]> kafkaTemplate;
+
+ LongSerializer serializer = new LongSerializer();
+ long[] currentOffsets = new long[] { 0, 0 };
+
+ @Autowired
+ AdminClient adminClient;
+ @Autowired
+ MockRecordHandler mockHandler;
+ @Autowired
+ AtomicBoolean isTerminatedExceptionally;
+
+
+ @BeforeEach
+ void resetTopic() {
+ adminClient.deleteRecords(Map.of(
+ new TopicPartition(TOPIC, 0), deleteAllRecordsByPartition(0),
+ new TopicPartition(TOPIC, 1), deleteAllRecordsByPartition(1)));
+ mockHandler.clear();
+ isTerminatedExceptionally.set(false);
+ }
+
+ private RecordsToDelete deleteAllRecordsByPartition(int x)
+ {
+ return RecordsToDelete.beforeOffset(currentOffsets[x] + 1);
+ }
+
+ private void sendValidMessage(int partition)
+ {
+ kafkaTemplate.send(TOPIC, partition, "EGAL", serializer.serialize(TOPIC, (long)partition));
+ }
+
+ private void sendNonDeserializableMessage(int partition)
+ {
+ kafkaTemplate.send(TOPIC, partition, "EGAL", "BOOM!".getBytes());
+ }
+
+ private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+ {
+ kafkaTemplate.send(
+ TOPIC,
+ partition,
+ "EGAL",
+ serializer.serialize(TOPIC, (long)VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION));
+ }
+
+
+ public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+
+
+ @TestConfiguration
+ @Import(ApplicationConfiguration.class)
+ static class ConsumerRunnableTestConfig
+ {
+ @Bean
+ RecordHandler<String, Long> recordHandler()
+ {
+ return new MockRecordHandler(Duration.ofMillis(100));
+ }
+
+ @Primary
+ @Bean
+ Runnable errorCallback(AtomicBoolean isTerminatedExceptionally)
+ {
+ return () -> isTerminatedExceptionally.set(true);
+ }
+
+ @Bean
+ AtomicBoolean isTerminatedExceptionally()
+ {
+ return new AtomicBoolean();
+ }
+
+ @Bean
+ AdminClient adminClient(
+ @Value("${spring.embedded.kafka.brokers}")
+ String kafkaBroker)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+ return AdminClient.create(properties);
+ }
+ }
+}