import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
import org.springframework.boot.test.context.TestConfiguration;
@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestPropertySource(
properties = {
"consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
@Test
+ @Order(1) // << The poistion pill is not skipped. Hence, this test must run first
void commitsCurrentOffsetsOnSuccess()
{
send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i)));
Set<ConsumerRecord<String, Long>> received = new HashSet<>();
- Map<Integer, Long> offsets = runEndlessConsumer(record ->
+ Map<TopicPartition, Long> offsets = runEndlessConsumer(record ->
{
received.add(record);
if (received.size() == 100)
}
@Test
+ @Order(2)
void commitsNoOffsetsOnError()
{
send100Messages(counter ->
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: new Bytes(longSerializer.serialize(TOPIC, counter)));
- Map<Integer, Long> oldOffsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
- Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+ Map<TopicPartition, Long> oldOffsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp, offset -1));
+ Map<TopicPartition, Long> newOffsets = runEndlessConsumer((record) -> {});
check(oldOffsets);
}
}
}
- Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+ Map<TopicPartition, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
{
- Map<Integer, Long> offsets = new HashMap<>();
- doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
- Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> offsets.put(tp, offset -1));
+ Consumer<ConsumerRecord<String, Long>> captureOffset =
+ record ->
+ offsets.put(
+ new TopicPartition(record.topic(), record.partition()),
+ record.offset());
EndlessConsumer<String, Long> endlessConsumer =
new EndlessConsumer<>(
executor,
kafkaConsumer.unsubscribe();
}
- void check(Map<Integer, Long> offsets)
+ void check(Map<TopicPartition, Long> offsets)
{
doForCurrentOffsets((tp, offset) ->
{
- Long expected = offsets.get(tp.partition()) + 1;
+ Long expected = offsets.get(tp) + 1;
log.debug("Checking, if the offset for {} is {}", tp, expected);
assertThat(offset).isEqualTo(expected);
});