import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
import org.springframework.stereotype.Component;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.function.Consumer;
@Component
@Slf4j
@RequiredArgsConstructor
-public class EndlessConsumer<K, V>
+public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
{
@Autowired
private KafkaListenerEndpointRegistry registry;
- @Value("${consumer.client-id}")
+ @Value("${spring.kafka.consumer.client-id}")
String id;
@Autowired
Consumer<ConsumerRecord<K, V>> handler;
private long consumed = 0;
+ private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+ private final Map<Integer, Long> offsets = new HashMap<>();
+
+
+ @Override
+ public void onPartitionsRevokedBeforeCommit(
+ org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+ Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long newOffset = consumer.position(tp);
+ Long oldOffset = offsets.remove(partition);
+ log.info(
+ "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+ id,
+ partition,
+ newOffset - oldOffset,
+ oldOffset,
+ newOffset);
+ Map<String, Long> removed = seen.remove(partition);
+ for (String key : removed.keySet())
+ {
+ log.info(
+ "{} - Seen {} messages for partition={}|key={}",
+ id,
+ removed.get(key),
+ partition,
+ key);
+ }
+ });
+ }
+
+ @Override
+ public void onPartitionsAssigned(
+ org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+ Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(tp ->
+ {
+ Integer partition = tp.partition();
+ Long offset = consumer.position(tp);
+ log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+ offsets.put(partition, offset);
+ seen.put(partition, new HashMap<>());
+ });
+ }
+
+
@KafkaListener(
- id = "${consumer.client-id}",
+ id = "${spring.kafka.consumer.client-id}",
idIsGroup = false,
topics = "${consumer.topic}",
- containerFactory = "batchFactory",
autoStartup = "false")
- public void receive(List<ConsumerRecord<K, V>> records)
+ public void receive(ConsumerRecord<K, V> record)
{
- // Do something with the data...
- log.info("{} - Received {} messages", id, records.size());
- for (ConsumerRecord<K, V> record : records)
- {
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ record.offset(),
+ record.topic(),
+ record.partition(),
+ record.key(),
+ record.value()
+ );
- handler.accept(record);
+ handler.accept(record);
- consumed++;
- }
+ consumed++;
}
registry.getListenerContainer(id).stop();
log.info("{} - Stopped - consumed {} messages so far", id, consumed);
}
+
+ public synchronized boolean isRunning()
+ {
+ return registry.getListenerContainer(id).isChildRunning();
+ }
}