+ rememberExceptionAndStopContainer(thrownException, container);
+ }
+
+ @Override
+ public void handleRemaining(
+ Exception thrownException,
+ List<ConsumerRecord<?, ?>> records,
+ Consumer<?, ?> consumer,
+ MessageListenerContainer container)
+ {
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ records.forEach(record ->
+ offsets.computeIfAbsent(
+ new TopicPartition(record.topic(), record.partition()),
+ offset -> record.offset()));
+ offsets.forEach((tp, offset) -> consumer.seek(tp, offset));