import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestPropertySource(
properties = {
"consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
@Autowired
ExecutorService executor;
+ Map<TopicPartition, Long> oldOffsets;
+ Map<TopicPartition, Long> newOffsets;
+
@Test
+ @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
void commitsCurrentOffsetsOnSuccess()
{
send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i)));
Set<ConsumerRecord<String, Long>> received = new HashSet<>();
- Map<Integer, Long> offsets = runEndlessConsumer(record ->
+ runEndlessConsumer(record ->
{
received.add(record);
if (received.size() == 100)
throw new WakeupException();
});
- check(offsets);
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(newOffsets);
}
@Test
+ @Order(2)
void commitsNoOffsetsOnError()
{
send100Messages(counter ->
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: new Bytes(longSerializer.serialize(TOPIC, counter)));
- Map<Integer, Long> oldOffsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
- Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+ runEndlessConsumer((record) -> {});
- check(oldOffsets);
+ checkSeenOffsetsForProgress();
+ compareToCommitedOffsets(oldOffsets);
}
}
}
- Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+ EndlessConsumer<String, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
{
- Map<Integer, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+ oldOffsets = new HashMap<>();
+ newOffsets = new HashMap<>();
+
+ doForCurrentOffsets((tp, offset) ->
+ {
+ oldOffsets.put(tp, offset - 1);
+ newOffsets.put(tp, offset - 1);
+ });
+
+ Consumer<ConsumerRecord<String, Long>> captureOffset =
+ record ->
+ newOffsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
+
EndlessConsumer<String, Long> endlessConsumer =
new EndlessConsumer<>(
executor,
endlessConsumer.run();
- return offsets;
+ return endlessConsumer;
}
List<TopicPartition> partitions()
kafkaConsumer.unsubscribe();
}
- void check(Map<Integer, Long> offsets)
+ void checkSeenOffsetsForProgress()
+ {
+ // Be sure, that some messages were consumed...!
+ Set<TopicPartition> withProgress = new HashSet<>();
+ partitions().forEach(tp ->
+ {
+ Long oldOffset = oldOffsets.get(tp);
+ Long newOffset = newOffsets.get(tp);
+ if (!oldOffset.equals(newOffset))
+ {
+ log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
+ withProgress.add(tp);
+ }
+ });
+ assertThat(withProgress).isNotEmpty().describedAs("Found no partitions with any offset-progress");
+ }
+
+ void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
{
doForCurrentOffsets((tp, offset) ->
{
- Long expected = offsets.get(tp.partition()) + 1;
+ Long expected = offsetsToCheck.get(tp) + 1;
log.debug("Checking, if the offset for {} is {}", tp, expected);
assertThat(offset).isEqualTo(expected);
});