import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
-import org.springframework.util.Assert;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
private Exception exception;
private boolean ack = true;
+ @Override
+ public boolean remainingRecords()
+ {
+ return true;
+ }
@Override
public void handleOtherException(
MessageListenerContainer container,
boolean batchListener)
{
- Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+ rememberExceptionAndStopContainer(thrownException, container);
+ }
+
+ @Override
+ public void handleRemaining(
+ Exception thrownException,
+ List<ConsumerRecord<?, ?>> records,
+ Consumer<?, ?> consumer,
+ MessageListenerContainer container)
+ {
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ records.forEach(record ->
+ offsets.computeIfAbsent(
+ new TopicPartition(record.topic(), record.partition()),
+ offset -> record.offset()));
+ offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
rememberExceptionAndStopContainer(thrownException, container);
}
id = "${spring.kafka.client-id}",
idIsGroup = false,
topics = "${sumup.adder.topic}",
- batch = "true",
autoStartup = "false")
- public void accept(List<ConsumerRecord<K, V>> records)
+ public void accept(ConsumerRecord<K, V> record)
{
- // Do something with the data...
- log.info("{} - Received {} messages", id, records.size());
- for (ConsumerRecord<K, V> record : records)
- {
log.info(
"{} - {}: {}/{} - {}={}",
id,
recordHandler.accept(record);
consumed++;
- }
}
public void start()
@Test
@SkipWhenErrorCannotBeGenerated(logicError = true)
- void doesNotCommitOffsetsOnLogicError()
+ void commitsOffsetsOfUnseenRecordsOnLogicError()
{
int numberOfGeneratedMessages =
recordGenerator.generate(false, true, messageSender);
.until(() -> !endlessConsumer.running());
checkSeenOffsetsForProgress();
- assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
endlessConsumer.start();
await("Consumer failed")
.pollInterval(Duration.ofSeconds(1))
.until(() -> !endlessConsumer.running());
- assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
+ assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
assertThatNoException()
.describedAs("Consumer should not be running")