X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=04a0a3af8e112e55f7c34a60fe785713c27283b8;hb=1709f0e4f41be7e3b955d19769697a517633827d;hp=888805ffe9cf0876f87a223d594139fe4843364b;hpb=5966e3b824a0303b02fd59c693ae35c3ededa111;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 888805f..04a0a3a 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -3,31 +3,86 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import org.springframework.stereotype.Component; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.function.Consumer; @Component @Slf4j @RequiredArgsConstructor -public class EndlessConsumer +public class EndlessConsumer implements ConsumerAwareRebalanceListener { @Autowired private KafkaListenerEndpointRegistry registry; - @Value("${consumer.client-id}") + @Value("${spring.kafka.consumer.client-id}") String id; @Autowired Consumer> handler; private long consumed = 0; + private final Map> seen = new HashMap<>(); + private final Map offsets = new HashMap<>(); + + + @Override + public void onPartitionsRevokedBeforeCommit( + org.apache.kafka.clients.consumer.Consumer consumer, + Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + Long oldOffset = offsets.remove(partition); + log.info( + "{} - removing partition: {}, consumed {} records (offset {} -> {})", + id, + partition, + newOffset - oldOffset, + oldOffset, + newOffset); + Map removed = seen.remove(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } + }); + } + + @Override + public void onPartitionsAssigned( + org.apache.kafka.clients.consumer.Consumer consumer, + Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); + offsets.put(partition, offset); + seen.put(partition, new HashMap<>()); + }); + } + + @KafkaListener( - id = "${consumer.client-id}", + id = "${spring.kafka.consumer.client-id}", idIsGroup = false, topics = "${consumer.topic}", autoStartup = "false") @@ -67,4 +122,9 @@ public class EndlessConsumer registry.getListenerContainer(id).stop(); log.info("{} - Stopped - consumed {} messages so far", id, consumed); } + + public synchronized boolean isRunning() + { + return registry.getListenerContainer(id).isChildRunning(); + } }