import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
@RequiredArgsConstructor
@Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
+public class ApplicationRebalanceListener implements CommittingConsumerRebalanceListener
{
private final ApplicationRecordHandler recordHandler;
private final AdderResults adderResults;
private final Set<Integer> partitions = new HashSet<>();
+ private boolean commitsEnabled = true;
+
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{
- log.info("{} - Commiting offsets for all previously assigned partitions", id);
- try
- {
- consumer.commitSync();
- }
- catch (Exception e)
+ if (commitsEnabled)
{
- log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+ log.info("{} - Commiting offsets for all previously assigned partitions", id);
+ try
+ {
+ consumer.commitSync();
+ }
+ catch (Exception e)
+ {
+ log.warn("{} - Could not commit offsets in onPartitionsRevoked():", id, e);
+ }
}
partitions.forEach(tp ->
stateRepository.save(new StateDocument(partition, state, results));
});
}
+
+ @Override
+ public void enableCommits()
+ {
+ commitsEnabled = true;
+ }
+
+ @Override
+ public void disableCommits()
+ {
+ commitsEnabled = false;
+ }
}
--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+
+
+public interface CommittingConsumerRebalanceListener extends ConsumerRebalanceListener
+{
+ void enableCommits();
+ void disableCommits();
+}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final ConsumerRebalanceListener rebalanceListener;
+ private final CommittingConsumerRebalanceListener rebalanceListener;
private final RecordHandler<K, V> recordHandler;
private final Lock lock = new ReentrantLock();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
+ rebalanceListener.enableCommits();
consumer.subscribe(Arrays.asList(topic), rebalanceListener);
while (true)
}
catch(Exception e)
{
- log.error("{} - Unexpected error: {}", id, e.toString(), e);
+ log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
+ rebalanceListener.disableCommits();
shutdown(e);
}
finally
import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
@Autowired
MongoProperties mongoProperties;
@Autowired
- ConsumerRebalanceListener rebalanceListener;
+ CommittingConsumerRebalanceListener rebalanceListener;
@Autowired
RecordHandler<K, V> recordHandler;
Long expected = offsetsToCheck.get(tp) + 1;
log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
assertThat(offset)
- .describedAs("Committed offset corresponds to the offset of the consumer")
+ .describedAs("Committed offset must be at most equal to the offset of the consumer")
.isLessThanOrEqualTo(expected);
isOffsetBehindSeen.add(offset < expected);
});