package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.StickyAssignor;
@Configuration
@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
public class ApplicationConfiguration
{
@Bean
public ExampleConsumer<String, Long> exampleConsumer(
Consumer<String, Long> kafkaConsumer,
+ RecordHandler<String, Long> recordHandler,
ApplicationProperties properties,
ConfigurableApplicationContext applicationContext)
{
properties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
+ recordHandler,
() -> applicationContext.close());
}
+ @Bean
+ public RecordHandler<String, Long> recordHandler()
+ {
+ return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
+ }
+
@Bean(destroyMethod = "")
public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
{
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+
+
+@SpringBootTest(
+ classes = {
+ KafkaAutoConfiguration.class,
+ ApplicationProperties.class,
+ ExampleConsumerTest.ConsumerRunnableTestConfig.class,
+ },
+ properties = {
+ "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+ "logging.level.de.juplo.kafka=TRACE",
+ })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public class ExampleConsumerTest
+{
+ @DisplayName("All messages are consumed")
+ @Test
+ void testOnlyValidMessages()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ }
+
+ @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
+ @Test
+ void testDeserializationException()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendNonDeserializableMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ }
+
+ @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+ @Test
+ void testUnexpectedDomainError() throws Exception
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRuntimeExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("The ConsumerRunnable is exited by an unexpected exception")
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(250))
+ .until(() -> isTerminatedExceptionally.get());
+ }
+
+
+ static final String ID = "TEST";
+ static final String TOPIC = "ExampleConsumerTest_TEST";
+ static final int NUM_PARTITIONS = 10;
+
+ @Autowired
+ KafkaTemplate<String, byte[]> kafkaTemplate;
+
+ final LongSerializer serializer = new LongSerializer();
+ final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+ long nextMessage = 1;
+
+ final MockRecordHandler mockRecordHandler = new MockRecordHandler();
+ final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
+
+ ExampleConsumer exampleConsumer;
+
+
+ @BeforeEach
+ void createExampleConsumer(@Autowired ApplicationProperties properties)
+ {
+ ApplicationConfiguration configuration = new ApplicationConfiguration();
+ Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
+
+ exampleConsumer = new ExampleConsumer(
+ ID,
+ TOPIC,
+ consumer,
+ mockRecordHandler,
+ () -> isTerminatedExceptionally.set(true));
+ }
+
+ @AfterEach
+ void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+ {
+ exampleConsumer.shutdown();
+ adminClient
+ .deleteRecords(recordsToDelete())
+ .all()
+ .get();
+ mockRecordHandler.clear();
+ nextMessage = 1;
+ isTerminatedExceptionally.set(false);
+ }
+
+ private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+ {
+ return IntStream
+ .range(0, NUM_PARTITIONS)
+ .filter(i -> currentOffsets[i] > 0)
+ .mapToObj(i -> Integer.valueOf(i))
+ .collect(Collectors.toMap(
+ i -> new TopicPartition(TOPIC, i),
+ i -> recordsToDelete(i)));
+ }
+
+ private RecordsToDelete recordsToDelete(int partition)
+ {
+ return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+ }
+
+ private void sendValidMessage(int partition)
+ {
+ send(partition, nextMessage);
+ }
+
+ private void sendNonDeserializableMessage(int partition)
+ {
+ send(partition, "BOOM!".getBytes());
+ }
+
+ private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+ {
+ send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
+ }
+
+ private void send(int partition, long message)
+ {
+ send(partition, serializer.serialize(TOPIC, message));
+ }
+
+ private void send(int partition, byte[] bytes)
+ {
+ nextMessage++;
+ kafkaTemplate
+ .send(TOPIC, partition, "EGAL", bytes)
+ .thenAccept(result ->
+ {
+ RecordMetadata metadata = result.getRecordMetadata();
+ currentOffsets[metadata.partition()] = metadata.offset();
+ });
+ }
+
+
+ public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+
+
+ @TestConfiguration
+ static class ConsumerRunnableTestConfig
+ {
+ @Bean
+ AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+ return AdminClient.create(properties);
+ }
+ }
+}