package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Autowired
KafkaProducer<String, Bytes> kafkaProducer;
@Autowired
- org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer;
- @Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
ApplicationProperties applicationProperties;
@Autowired
EndlessConsumer endlessConsumer;
@Autowired
- RecordHandler recordHandler;
+ ClientMessageHandler clientMessageHandler;
Map<TopicPartition, Long> oldOffsets;
Map<TopicPartition, Long> newOffsets;
- Set<ConsumerRecord<String, ClientMessage>> receivedRecords;
+ Set<ClientMessage> received;
/** Tests methods */
await("100 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() == 100);
+ .until(() -> received.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
await("99 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() == 99);
+ .until(() -> received.size() == 99);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
@Test
void commitsOffsetOnProgramLogicErrorFoo()
{
- recordHandler.testHandler = (record) ->
+ clientMessageHandler.testHandler = (clientMessage, metadata) ->
{
- if (Integer.parseInt(record.value().message)%10 ==0)
- throw new RuntimeException("BOOM: " + record.value().message + "%10 == 0");
+ if (Integer.parseInt(clientMessage.message)%10 ==0)
+ throw new RuntimeException("BOOM: " + clientMessage.message + "%10 == 0");
};
send100Messages((key, counter) -> serialize(key, counter));
await("80 records received")
.atMost(Duration.ofSeconds(30))
- .until(() -> receivedRecords.size() == 100);
+ .until(() -> received.size() == 100);
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
+ .pollDelay(Duration.ofSeconds(1))
.untilAsserted(() ->
{
checkSeenOffsetsForProgress();
@BeforeEach
public void init()
{
- recordHandler.testHandler = (record) -> {};
+ clientMessageHandler.testHandler = (clientMessage, metadata) -> {};
oldOffsets = new HashMap<>();
newOffsets = new HashMap<>();
- receivedRecords = new HashSet<>();
+ received = new HashSet<>();
doForCurrentOffsets((tp, offset) ->
{
newOffsets.put(tp, offset - 1);
});
- recordHandler.captureOffsets =
- record ->
+ clientMessageHandler.captureOffsets =
+ (clientMessage, metadata) ->
{
- receivedRecords.add(record);
- log.debug("TEST: Processing record #{}: {}", receivedRecords.size(), record.value());
+ received.add(clientMessage);
+ log.debug("TEST: Processing record #{}: {}", received.size(), clientMessage);
newOffsets.put(
- new TopicPartition(record.topic(), record.partition()),
- record.offset());
+ new TopicPartition(metadata.topic(), metadata.partition()), metadata.offset());
};
endlessConsumer.start();
}
}
- public static class RecordHandler implements Consumer<ConsumerRecord<String, ClientMessage>>
+ public static class ClientMessageHandler implements BiConsumer<ClientMessage, ConsumerRecordMetadata>
{
- Consumer<ConsumerRecord<String, ClientMessage>> captureOffsets;
- Consumer<ConsumerRecord<String, ClientMessage>> testHandler;
+ BiConsumer<ClientMessage, ConsumerRecordMetadata> captureOffsets;
+ BiConsumer<ClientMessage, ConsumerRecordMetadata> testHandler;
@Override
- public void accept(ConsumerRecord<String, ClientMessage> record)
+ public void accept(ClientMessage clientMessage, ConsumerRecordMetadata metadata)
{
captureOffsets
.andThen(testHandler)
- .accept(record);
+ .accept(clientMessage, metadata);
}
}
{
@Primary
@Bean
- public Consumer<ConsumerRecord<String, ClientMessage>> testHandler()
+ public BiConsumer<ClientMessage, ConsumerRecordMetadata> testHandler()
{
- return new RecordHandler();
+ return new ClientMessageHandler();
}
@Bean