* Die Implementierung sieht vor, dass bei einer unerwarteten Exception
(i.d.R. ein Fehler in der Fachlogik) kein Commit durchgeführt wird.
* Ansonsten müsste in der Situation ein expliziter Seek der Offstes auf
die Positionen der vor dem Auftreten des Fehlers verarbeiteten
Nachrichten durchgeführt werden, damit es nicht zu einem Verlust von
Nachrichten kommt.
* Dieses Verhalten wurde durch die Verlagerung des Commits in den
Rebalance-Listener unterwandert, da der Commit dort auch im Falle
einer unerwarteten Exception durchgeführt wurde.
* Als Korrektur wurde hier eine Methode eingeführt, über die der
Commit im Rebalance in dieser Situation unterdrückt werden kann.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@RequiredArgsConstructor
@Slf4j
@RequiredArgsConstructor
@Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
+public class ApplicationRebalanceListener implements CommittingConsumerRebalanceListener
{
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
{
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final Set<Integer> partitions = new HashSet<>();
private final Set<Integer> partitions = new HashSet<>();
+ private boolean commitsEnabled = true;
+
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- log.info("{} - Commiting offsets for all previously assigned partitions", id);
- try
- {
- consumer.commitSync();
- }
- catch (Exception e)
- log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+ log.info("{} - Commiting offsets for all previously assigned partitions", id);
+ try
+ {
+ consumer.commitSync();
+ }
+ catch (Exception e)
+ {
+ log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+ }
}
partitions.forEach(tp ->
}
partitions.forEach(tp ->
stateRepository.save(new StateDocument(partition, state, results));
});
}
stateRepository.save(new StateDocument(partition, state, results));
});
}
+
+ @Override
+ public void enableCommits()
+ {
+ commitsEnabled = true;
+ }
+
+ @Override
+ public void disableCommits()
+ {
+ commitsEnabled = false;
+ }
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface CommittingConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+ void enableCommits();
+ void disableCommits();
+}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final ConsumerRebalanceListener rebalanceListener;
+ private final CommittingConsumerRebalanceListener rebalanceListener;
private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
try
{
log.info("{} - Subscribing to topic {}", id, topic);
+ rebalanceListener.enableCommits();
consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
- log.error("{} - Unexpected error: {}", id, e.toString(), e);
+ log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+ rebalanceListener.disableCommits();
import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@Autowired
MongoProperties mongoProperties;
@Autowired
@Autowired
MongoProperties mongoProperties;
@Autowired
- ConsumerRebalanceListener rebalanceListener;
+ CommittingConsumerRebalanceListener rebalanceListener;
@Autowired
RecordHandler<K, V> recordHandler;
@Autowired
RecordHandler<K, V> recordHandler;
Long expected = offsetsToCheck.get(tp) + 1;
log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
assertThat(offset)
Long expected = offsetsToCheck.get(tp) + 1;
log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
assertThat(offset)
- .describedAs("Committed offset corresponds to the offset of the consumer")
+ .describedAs("Committed offset must be at most equal to the offset of the consumer")
.isLessThanOrEqualTo(expected);
isOffsetBehindSeen.add(offset < expected);
});
.isLessThanOrEqualTo(expected);
isOffsetBehindSeen.add(offset < expected);
});