"$1" = "build"
]]
then
- mvn install || exit
+ docker-compose rm -svf peter beate
+ mvn clean install || exit
else
echo "Using image existing images:"
docker image ls $IMAGE
docker-compose up -d producer
sleep 10
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
docker-compose stop producer
docker-compose exec -T cli bash << 'EOF'
docker-compose start producer
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
sleep 1
-http -v :8081/seen
+http -v :8081/state
docker-compose stop producer consumer
<version>1.0-SNAPSHOT</version>
<name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
+ <properties>
+ <java.version>11</java.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
public class ApplicationConfiguration
{
@Bean
- public KeyCountingRecordHandler keyCountingRecordHandler()
+ public ApplicationRecordHandler recordHandler()
{
- return new KeyCountingRecordHandler();
+ return new ApplicationRecordHandler();
}
@Bean
- public KeyCountingRebalanceListener keyCountingRebalanceListener(
- KeyCountingRecordHandler keyCountingRecordHandler,
- PartitionStatisticsRepository repository,
+ public ApplicationRebalanceListener rebalanceListener(
+ ApplicationRecordHandler recordHandler,
+ StateRepository stateRepository,
ApplicationProperties properties)
{
- return new KeyCountingRebalanceListener(
- keyCountingRecordHandler,
- repository,
+ return new ApplicationRebalanceListener(
+ recordHandler,
+ stateRepository,
properties.getClientId(),
Clock.systemDefaultZone(),
properties.getCommitInterval());
public EndlessConsumer<String, Long> endlessConsumer(
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- KeyCountingRebalanceListener keyCountingRebalanceListener,
- KeyCountingRecordHandler keyCountingRecordHandler,
+ ApplicationRebalanceListener rebalanceListener,
+ ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- keyCountingRebalanceListener,
- keyCountingRecordHandler);
+ rebalanceListener,
+ recordHandler);
}
@Bean
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+ private final ApplicationRecordHandler recordHandler;
+ private final StateRepository stateRepository;
+ private final String id;
+ private final Clock clock;
+ private final Duration commitInterval;
+
+ private Instant lastCommit = Instant.EPOCH;
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ log.info("{} - adding partition: {}", id, partition);
+ StateDocument document =
+ stateRepository
+ .findById(Integer.toString(partition))
+ .orElse(new StateDocument(partition));
+ recordHandler.addPartition(partition, document.state);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ log.info("{} - removing partition: {}", id, partition);
+ Map<String, Long> removed = recordHandler.removePartition(partition);
+ for (String key : removed.keySet())
+ {
+ log.info(
+ "{} - Seen {} messages for partition={}|key={}",
+ id,
+ removed.get(key),
+ partition,
+ key);
+ }
+ stateRepository.save(new StateDocument(partition, removed));
+ });
+ }
+
+
+ @Override
+ public void beforeNextPoll()
+ {
+ if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+ {
+ log.debug("Storing data, last commit: {}", lastCommit);
+ recordHandler.getState().forEach((partiton, statistics) -> stateRepository.save(
+ new StateDocument(
+ partiton,
+ statistics)));
+ lastCommit = clock.instant();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
+{
+ private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+
+
+ @Override
+ public void accept(ConsumerRecord<String, Long> record)
+ {
+ Integer partition = record.partition();
+ String key = record.key() == null ? "NULL" : record.key().toString();
+ Map<String, Long> byKey = state.get(partition);
+
+ if (!byKey.containsKey(key))
+ byKey.put(key, 0l);
+
+ long seenByKey = byKey.get(key);
+ seenByKey++;
+ byKey.put(key, seenByKey);
+ }
+
+ protected void addPartition(Integer partition, Map<String, Long> state)
+ {
+ this.state.put(partition, state);
+ }
+
+ protected Map<String, Long> removePartition(Integer partition)
+ {
+ return this.state.remove(partition);
+ }
+
+
+ public Map<Integer, Map<String, Long>> getState()
+ {
+ return state;
+ }
+}
public class DriverController
{
private final EndlessConsumer consumer;
- private final KeyCountingRecordHandler keyCountingRecordHandler;
+ private final ApplicationRecordHandler recordHandler;
@PostMapping("start")
}
- @GetMapping("seen")
- public Map<Integer, Map<String, Long>> seen()
+ @GetMapping("state")
+ public Map<Integer, Map<String, Long>> state()
{
- return keyCountingRecordHandler.getSeen();
+ return recordHandler.getState();
}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
- private final RecordHandler<K, V> handler;
+ private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+ private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+ consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
{
record.value()
);
- handler.accept(record);
+ recordHandler.accept(record);
consumed++;
}
- pollIntervalAwareRebalanceListener.beforeNextPoll();
+ rebalanceListener.beforeNextPoll();
}
}
catch(WakeupException e)
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
- private final KeyCountingRecordHandler handler;
- private final PartitionStatisticsRepository repository;
- private final String id;
- private final Clock clock;
- private final Duration commitInterval;
-
- private Instant lastCommit = Instant.EPOCH;
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- log.info("{} - adding partition: {}", id, partition);
- StatisticsDocument document =
- repository
- .findById(Integer.toString(partition))
- .orElse(new StatisticsDocument(partition));
- handler.addPartition(partition, document.statistics);
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- log.info("{} - removing partition: {}", id, partition);
- Map<String, Long> removed = handler.removePartition(partition);
- for (String key : removed.keySet())
- {
- log.info(
- "{} - Seen {} messages for partition={}|key={}",
- id,
- removed.get(key),
- partition,
- key);
- }
- repository.save(new StatisticsDocument(partition, removed));
- });
- }
-
-
- @Override
- public void beforeNextPoll()
- {
- if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
- {
- log.debug("Storing data, last commit: {}", lastCommit);
- handler.getSeen().forEach((partiton, statistics) -> repository.save(
- new StatisticsDocument(
- partiton,
- statistics)));
- lastCommit = clock.instant();
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
- private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
- @Override
- public void accept(ConsumerRecord<String, Long> record)
- {
- Integer partition = record.partition();
- String key = record.key() == null ? "NULL" : record.key().toString();
- Map<String, Long> byKey = seen.get(partition);
-
- if (!byKey.containsKey(key))
- byKey.put(key, 0l);
-
- long seenByKey = byKey.get(key);
- seenByKey++;
- byKey.put(key, seenByKey);
- }
-
- public void addPartition(Integer partition, Map<String, Long> statistics)
- {
- seen.put(partition, statistics);
- }
-
- public Map<String, Long> removePartition(Integer partition)
- {
- return seen.remove(partition);
- }
-
-
- public Map<Integer, Map<String, Long>> getSeen()
- {
- return seen;
- }
-}
+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
-{
- public Optional<StatisticsDocument> findById(String partition);
-}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Document(collection = "state")
+@ToString
+public class StateDocument
+{
+ @Id
+ public String id;
+ public Map<String, Long> state;
+
+ public StateDocument()
+ {
+ }
+
+ public StateDocument(Integer partition)
+ {
+ this.id = Integer.toString(partition);
+ this.state = new HashMap<>();
+ }
+
+ public StateDocument(Integer partition, Map<String, Long> state)
+ {
+ this.id = Integer.toString(partition);
+ this.state = state;
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+ public Optional<StateDocument> findById(String partition);
+}
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
- @Id
- public String id;
- public Map<String, Long> statistics;
-
- public StatisticsDocument()
- {
- }
-
- public StatisticsDocument(Integer partition)
- {
- this.id = Integer.toString(partition);
- this.statistics = new HashMap<>();
- }
-
- public StatisticsDocument(Integer partition, Map<String, Long> statistics)
- {
- this.id = Integer.toString(partition);
- this.statistics = statistics;
- }
-}
package de.juplo.kafka;
-import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
-import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.junit.jupiter.api.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+import org.springframework.context.annotation.Primary;
+import org.springframework.test.context.ContextConfiguration;
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
+import java.util.function.Consumer;
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
-import static de.juplo.kafka.ApplicationTests.TOPIC;
-import static org.assertj.core.api.Assertions.*;
-import static org.awaitility.Awaitility.*;
-
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
-@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-@TestPropertySource(
- properties = {
- "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "consumer.topic=" + TOPIC,
- "consumer.commit-interval=1s",
- "spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@Slf4j
-class ApplicationTests
+@ContextConfiguration(classes = ApplicationTests.Configuration.class)
+public class ApplicationTests extends GenericApplicationTests<String, Long>
{
- public static final String TOPIC = "FOO";
- public static final int PARTITIONS = 10;
-
-
- StringSerializer stringSerializer = new StringSerializer();
-
- @Autowired
- Serializer valueSerializer;
- @Autowired
- KafkaProducer<String, Bytes> kafkaProducer;
- @Autowired
- KafkaConsumer<String, Long> kafkaConsumer;
- @Autowired
- KafkaConsumer<Bytes, Bytes> offsetConsumer;
- @Autowired
- ApplicationProperties properties;
- @Autowired
- ExecutorService executor;
- @Autowired
- KeyCountingRebalanceListener keyCountingRebalanceListener;
- @Autowired
- KeyCountingRecordHandler keyCountingRecordHandler;
-
- EndlessConsumer<String, Long> endlessConsumer;
- Map<TopicPartition, Long> oldOffsets;
- Map<TopicPartition, Long> newOffsets;
- Set<ConsumerRecord<String, Long>> receivedRecords;
-
-
- /** Tests methods */
-
- @Test
- void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
- {
- send100Messages((partition, key, counter) ->
- {
- Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
- return new ProducerRecord<>(TOPIC, partition, key, value);
- });
-
- await("100 records received")
- .atMost(Duration.ofSeconds(30))
- .pollInterval(Duration.ofSeconds(1))
- .until(() -> receivedRecords.size() >= 100);
-
- await("Offsets committed")
- .atMost(Duration.ofSeconds(10))
- .pollInterval(Duration.ofSeconds(1))
- .untilAsserted(() ->
- {
- checkSeenOffsetsForProgress();
- compareToCommitedOffsets(newOffsets);
- });
-
- assertThatExceptionOfType(IllegalStateException.class)
- .isThrownBy(() -> endlessConsumer.exitStatus())
- .describedAs("Consumer should still be running");
- }
-
- @Test
- void commitsOffsetOfErrorForReprocessingOnDeserializationError()
- {
- send100Messages((partition, key, counter) ->
- {
- Bytes value = counter == 77
- ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
- : new Bytes(valueSerializer.serialize(TOPIC, counter));
- return new ProducerRecord<>(TOPIC, partition, key, value);
- });
-
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
- .pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- compareToCommitedOffsets(newOffsets);
-
- endlessConsumer.start();
- await("Consumer failed")
- .atMost(Duration.ofSeconds(30))
- .pollInterval(Duration.ofSeconds(1))
- .until(() -> !endlessConsumer.running());
-
- checkSeenOffsetsForProgress();
- compareToCommitedOffsets(newOffsets);
- assertThat(receivedRecords.size())
- .describedAs("Received not all sent events")
- .isLessThan(100);
-
- assertThatNoException()
- .describedAs("Consumer should not be running")
- .isThrownBy(() -> endlessConsumer.exitStatus());
- assertThat(endlessConsumer.exitStatus())
- .describedAs("Consumer should have exited abnormally")
- .containsInstanceOf(RecordDeserializationException.class);
- }
-
-
- /** Helper methods for the verification of expectations */
-
- void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
- {
- doForCurrentOffsets((tp, offset) ->
- {
- Long expected = offsetsToCheck.get(tp) + 1;
- log.debug("Checking, if the offset for {} is {}", tp, expected);
- assertThat(offset)
- .describedAs("Committed offset corresponds to the offset of the consumer")
- .isEqualTo(expected);
- });
- }
-
- void checkSeenOffsetsForProgress()
- {
- // Be sure, that some messages were consumed...!
- Set<TopicPartition> withProgress = new HashSet<>();
- partitions().forEach(tp ->
- {
- Long oldOffset = oldOffsets.get(tp) + 1;
- Long newOffset = newOffsets.get(tp) + 1;
- if (!oldOffset.equals(newOffset))
- {
- log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
- withProgress.add(tp);
- }
- });
- assertThat(withProgress)
- .describedAs("Some offsets must have changed, compared to the old offset-positions")
- .isNotEmpty();
- }
-
-
- /** Helper methods for setting up and running the tests */
-
- void seekToEnd()
- {
- offsetConsumer.assign(partitions());
- offsetConsumer.seekToEnd(partitions());
- partitions().forEach(tp ->
- {
- // seekToEnd() works lazily: it only takes effect on poll()/position()
- Long offset = offsetConsumer.position(tp);
- log.info("New position for {}: {}", tp, offset);
- });
- // The new positions must be commited!
- offsetConsumer.commitSync();
- offsetConsumer.unsubscribe();
- }
-
- void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
- {
- offsetConsumer.assign(partitions());
- partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
- offsetConsumer.unsubscribe();
- }
-
- List<TopicPartition> partitions()
- {
- return
- IntStream
- .range(0, PARTITIONS)
- .mapToObj(partition -> new TopicPartition(TOPIC, partition))
- .collect(Collectors.toList());
- }
-
-
- public interface RecordGenerator<K, V>
- {
- public ProducerRecord<String, Bytes> generate(int partition, String key, long counter);
- }
-
- void send100Messages(RecordGenerator recordGenerator)
- {
- long i = 0;
-
- for (int partition = 0; partition < 10; partition++)
- {
- for (int key = 0; key < 10; key++)
- {
- ProducerRecord<String, Bytes> record =
- recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
-
- kafkaProducer.send(record, (metadata, e) ->
- {
- if (metadata != null)
- {
- log.debug(
- "{}|{} - {}={}",
- metadata.partition(),
- metadata.offset(),
- record.key(),
- record.value());
- }
- else
- {
- log.warn(
- "Exception for {}={}: {}",
- record.key(),
- record.value(),
- e.toString());
- }
- });
- }
- }
- }
-
-
- @BeforeEach
- public void init()
- {
- seekToEnd();
-
- oldOffsets = new HashMap<>();
- newOffsets = new HashMap<>();
- receivedRecords = new HashSet<>();
-
- doForCurrentOffsets((tp, offset) ->
- {
- oldOffsets.put(tp, offset - 1);
- newOffsets.put(tp, offset - 1);
- });
-
- TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
- new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
- @Override
- public void onNewRecord(ConsumerRecord<String, Long> record)
- {
- newOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
- receivedRecords.add(record);
- }
- };
-
- endlessConsumer =
- new EndlessConsumer<>(
- executor,
- properties.getClientId(),
- properties.getTopic(),
- kafkaConsumer,
- keyCountingRebalanceListener,
- captureOffsetAndExecuteTestHandler);
-
- endlessConsumer.start();
- }
-
- @AfterEach
- public void deinit()
- {
- try
- {
- endlessConsumer.stop();
- }
- catch (Exception e)
- {
- log.info("Exception while stopping the consumer: {}", e.toString());
- }
- }
-
-
- @TestConfiguration
- @Import(ApplicationConfiguration.class)
- public static class Configuration
- {
- @Bean
- Serializer<Long> serializer()
- {
- return new LongSerializer();
- }
-
- @Bean
- KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("linger.ms", 100);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", BytesSerializer.class.getName());
-
- return new KafkaProducer<>(props);
- }
-
- @Bean
- KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", "OFFSET-CONSUMER");
- props.put("group.id", properties.getGroupId());
- props.put("key.deserializer", BytesDeserializer.class.getName());
- props.put("value.deserializer", BytesDeserializer.class.getName());
-
- return new KafkaConsumer<>(props);
- }
- }
+ public ApplicationTests()
+ {
+ super(
+ new RecordGenerator()
+ {
+ final StringSerializer stringSerializer = new StringSerializer();
+ final LongSerializer longSerializer = new LongSerializer();
+
+
+ @Override
+ public int generate(
+ boolean poisonPills,
+ boolean logicErrors,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
+ {
+ int i = 0;
+
+ for (int partition = 0; partition < 10; partition++)
+ {
+ for (int key = 0; key < 10; key++)
+ {
+ i++;
+
+ Bytes value = new Bytes(longSerializer.serialize(TOPIC, (long)i));
+ if (i == 77)
+ {
+ if (logicErrors)
+ {
+ value = new Bytes(longSerializer.serialize(TOPIC, Long.MIN_VALUE));
+ }
+ if (poisonPills)
+ {
+ value = new Bytes(stringSerializer.serialize(TOPIC, "BOOM (Poison-Pill)!"));
+ }
+ }
+
+ ProducerRecord<Bytes, Bytes> record =
+ new ProducerRecord<>(
+ TOPIC,
+ partition,
+ new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
+ value);
+
+ messageSender.accept(record);
+ }
+ }
+
+ return i;
+ }
+ });
+ }
+
+
+ @TestConfiguration
+ public static class Configuration
+ {
+ @Primary
+ @Bean
+ public ApplicationRecordHandler recordHandler()
+ {
+ ApplicationRecordHandler recordHandler = new ApplicationRecordHandler();
+ return new ApplicationRecordHandler()
+ {
+ @Override
+ public void accept(ConsumerRecord<String, Long> record)
+ {
+ if (record.value() == Long.MIN_VALUE)
+ throw new RuntimeException("BOOM (Logic-Error)!");
+ super.accept(record);
+ }
+ };
+ }
+ }
}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.extension.ConditionEvaluationResult;
+import org.junit.jupiter.api.extension.ExecutionCondition;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.platform.commons.util.AnnotationUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+public class ErrorCannotBeGeneratedCondition implements ExecutionCondition
+{
+ @Override
+ public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context)
+ {
+ final Optional<SkipWhenErrorCannotBeGenerated> optional =
+ AnnotationUtils.findAnnotation(
+ context.getElement(),
+ SkipWhenErrorCannotBeGenerated.class);
+
+ if (context.getTestInstance().isEmpty())
+ return ConditionEvaluationResult.enabled("Test-instance ist not available");
+
+ if (optional.isPresent())
+ {
+ SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get();
+ GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get();
+ List<String> missingRequiredErrors = new LinkedList<>();
+
+ if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill())
+ missingRequiredErrors.add("Poison-Pill");
+
+ if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError())
+ missingRequiredErrors.add("Logic-Error");
+
+ StringBuilder builder = new StringBuilder();
+ builder.append(context.getTestClass().get().getSimpleName());
+
+ if (missingRequiredErrors.isEmpty())
+ {
+ builder.append(" can generate all required types of errors");
+ return ConditionEvaluationResult.enabled(builder.toString());
+ }
+
+ builder.append(" cannot generate the required error(s): ");
+ builder.append(
+ missingRequiredErrors
+ .stream()
+ .collect(Collectors.joining(", ")));
+
+ return ConditionEvaluationResult.disabled(builder.toString());
+ }
+
+ return ConditionEvaluationResult.enabled(
+ "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName());
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
+import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
+import static de.juplo.kafka.GenericApplicationTests.TOPIC;
+import static org.assertj.core.api.Assertions.*;
+import static org.awaitility.Awaitility.*;
+
+
+@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@TestPropertySource(
+ properties = {
+ "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "consumer.topic=" + TOPIC,
+ "consumer.commit-interval=1s",
+ "spring.mongodb.embedded.version=4.4.13" })
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@EnableAutoConfiguration
+@AutoConfigureDataMongo
+@Slf4j
+abstract class GenericApplicationTests<K, V>
+{
+ public static final String TOPIC = "FOO";
+ public static final int PARTITIONS = 10;
+
+
+ @Autowired
+ KafkaConsumer<K, V> kafkaConsumer;
+ @Autowired
+ Consumer<ConsumerRecord<K, V>> consumer;
+ @Autowired
+ ApplicationProperties properties;
+ @Autowired
+ ExecutorService executor;
+ @Autowired
+ PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+ @Autowired
+ RecordHandler<K, V> recordHandler;
+
+ KafkaProducer<Bytes, Bytes> testRecordProducer;
+ KafkaConsumer<Bytes, Bytes> offsetConsumer;
+ EndlessConsumer<K, V> endlessConsumer;
+ Map<TopicPartition, Long> oldOffsets;
+ Map<TopicPartition, Long> newOffsets;
+ Set<ConsumerRecord<K, V>> receivedRecords;
+
+
+ final RecordGenerator recordGenerator;
+ final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
+
+ public GenericApplicationTests(RecordGenerator recordGenerator)
+ {
+ this.recordGenerator = recordGenerator;
+ this.messageSender = (record) -> sendMessage(record);
+ }
+
+
+ /** Tests methods */
+
+ @Test
+ void commitsCurrentOffsetsOnSuccess()
+ {
+ int numberOfGeneratedMessages =
+ recordGenerator.generate(false, false, messageSender);
+
+ await(numberOfGeneratedMessages + " records received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
+
+ await("Offsets committed")
+ .atMost(Duration.ofSeconds(10))
+ .pollInterval(Duration.ofSeconds(1))
+ .untilAsserted(() ->
+ {
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ });
+
+ assertThatExceptionOfType(IllegalStateException.class)
+ .isThrownBy(() -> endlessConsumer.exitStatus())
+ .describedAs("Consumer should still be running");
+
+ recordGenerator.assertBusinessLogic();
+ }
+
+ @Test
+ @SkipWhenErrorCannotBeGenerated(poisonPill = true)
+ void commitsOffsetOfErrorForReprocessingOnDeserializationError()
+ {
+ int numberOfGeneratedMessages =
+ recordGenerator.generate(true, false, messageSender);
+
+ await("Consumer failed")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> !endlessConsumer.running());
+
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+
+ endlessConsumer.start();
+ await("Consumer failed")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> !endlessConsumer.running());
+
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
+ assertThat(receivedRecords.size())
+ .describedAs("Received not all sent events")
+ .isLessThan(numberOfGeneratedMessages);
+
+ assertThatNoException()
+ .describedAs("Consumer should not be running")
+ .isThrownBy(() -> endlessConsumer.exitStatus());
+ assertThat(endlessConsumer.exitStatus())
+ .describedAs("Consumer should have exited abnormally")
+ .containsInstanceOf(RecordDeserializationException.class);
+
+ recordGenerator.assertBusinessLogic();
+ }
+
+ @Test
+ @SkipWhenErrorCannotBeGenerated(logicError = true)
+ void doesNotCommitOffsetsOnLogicError()
+ {
+ int numberOfGeneratedMessages =
+ recordGenerator.generate(false, true, messageSender);
+
+ await("Consumer failed")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> !endlessConsumer.running());
+
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(oldOffsets);
+
+ endlessConsumer.start();
+ await("Consumer failed")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> !endlessConsumer.running());
+
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(oldOffsets);
+ assertThat(receivedRecords.size())
+ .describedAs("Received not all sent events")
+ .isLessThan(numberOfGeneratedMessages);
+
+ assertThatNoException()
+ .describedAs("Consumer should not be running")
+ .isThrownBy(() -> endlessConsumer.exitStatus());
+ assertThat(endlessConsumer.exitStatus())
+ .describedAs("Consumer should have exited abnormally")
+ .containsInstanceOf(RuntimeException.class);
+
+ recordGenerator.assertBusinessLogic();
+ }
+
+
+ /** Helper methods for the verification of expectations */
+
+ void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
+ {
+ doForCurrentOffsets((tp, offset) ->
+ {
+ Long expected = offsetsToCheck.get(tp) + 1;
+ log.debug("Checking, if the offset for {} is {}", tp, expected);
+ assertThat(offset)
+ .describedAs("Committed offset corresponds to the offset of the consumer")
+ .isEqualTo(expected);
+ });
+ }
+
+ void checkSeenOffsetsForProgress()
+ {
+ // Be sure, that some messages were consumed...!
+ Set<TopicPartition> withProgress = new HashSet<>();
+ partitions().forEach(tp ->
+ {
+ Long oldOffset = oldOffsets.get(tp) + 1;
+ Long newOffset = newOffsets.get(tp) + 1;
+ if (!oldOffset.equals(newOffset))
+ {
+ log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+ withProgress.add(tp);
+ }
+ });
+ assertThat(withProgress)
+ .describedAs("Some offsets must have changed, compared to the old offset-positions")
+ .isNotEmpty();
+ }
+
+
+ /** Helper methods for setting up and running the tests */
+
+ void seekToEnd()
+ {
+ offsetConsumer.assign(partitions());
+ offsetConsumer.seekToEnd(partitions());
+ partitions().forEach(tp ->
+ {
+ // seekToEnd() works lazily: it only takes effect on poll()/position()
+ Long offset = offsetConsumer.position(tp);
+ log.info("New position for {}: {}", tp, offset);
+ });
+ // The new positions must be commited!
+ offsetConsumer.commitSync();
+ offsetConsumer.unsubscribe();
+ }
+
+ void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+ {
+ offsetConsumer.assign(partitions());
+ partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
+ offsetConsumer.unsubscribe();
+ }
+
+ List<TopicPartition> partitions()
+ {
+ return
+ IntStream
+ .range(0, PARTITIONS)
+ .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+ .collect(Collectors.toList());
+ }
+
+
+ public interface RecordGenerator
+ {
+ int generate(
+ boolean poisonPills,
+ boolean logicErrors,
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
+
+ default boolean canGeneratePoisonPill()
+ {
+ return true;
+ }
+
+ default boolean canGenerateLogicError()
+ {
+ return true;
+ }
+
+ default void assertBusinessLogic()
+ {
+ log.debug("No business-logic to assert");
+ }
+ }
+
+ void sendMessage(ProducerRecord<Bytes, Bytes> record)
+ {
+ testRecordProducer.send(record, (metadata, e) ->
+ {
+ if (metadata != null)
+ {
+ log.debug(
+ "{}|{} - {}={}",
+ metadata.partition(),
+ metadata.offset(),
+ record.key(),
+ record.value());
+ }
+ else
+ {
+ log.warn(
+ "Exception for {}={}: {}",
+ record.key(),
+ record.value(),
+ e.toString());
+ }
+ });
+ }
+
+
+ @BeforeEach
+ public void init()
+ {
+ Properties props;
+ props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("linger.ms", 100);
+ props.put("key.serializer", BytesSerializer.class.getName());
+ props.put("value.serializer", BytesSerializer.class.getName());
+ testRecordProducer = new KafkaProducer<>(props);
+
+ props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("client.id", "OFFSET-CONSUMER");
+ props.put("group.id", properties.getGroupId());
+ props.put("key.deserializer", BytesDeserializer.class.getName());
+ props.put("value.deserializer", BytesDeserializer.class.getName());
+ offsetConsumer = new KafkaConsumer<>(props);
+
+ seekToEnd();
+
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+ receivedRecords = new HashSet<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
+ TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
+ new TestRecordHandler<K, V>(recordHandler)
+ {
+ @Override
+ public void onNewRecord(ConsumerRecord<K, V> record)
+ {
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+ receivedRecords.add(record);
+ }
+ };
+
+ endlessConsumer =
+ new EndlessConsumer<>(
+ executor,
+ properties.getClientId(),
+ properties.getTopic(),
+ kafkaConsumer,
+ rebalanceListener,
+ captureOffsetAndExecuteTestHandler);
+
+ endlessConsumer.start();
+ }
+
+ @AfterEach
+ public void deinit()
+ {
+ try
+ {
+ endlessConsumer.stop();
+ testRecordProducer.close();
+ offsetConsumer.close();
+ }
+ catch (Exception e)
+ {
+ log.info("Exception while stopping the consumer: {}", e.toString());
+ }
+ }
+
+
+ @TestConfiguration
+ @Import(ApplicationConfiguration.class)
+ public static class Configuration
+ {
+ }
+}
--- /dev/null
+package de.juplo.kafka;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@ExtendWith(ErrorCannotBeGeneratedCondition.class)
+public @interface SkipWhenErrorCannotBeGenerated
+{
+ boolean poisonPill() default false;
+ boolean logicError() default false;
+}