return new KeyCountingRecordHandler();
}
- @Bean
- public KeyCountingRebalanceListener keyCountingRebalanceListener(
- KeyCountingRecordHandler keyCountingRecordHandler,
- ApplicationProperties properties)
- {
- return new KeyCountingRebalanceListener(
- keyCountingRecordHandler,
- properties.getClientId());
- }
-
@Bean
public EndlessConsumer<String, Long> endlessConsumer(
KafkaConsumer<String, Long> kafkaConsumer,
ExecutorService executor,
- KeyCountingRebalanceListener keyCountingRebalanceListener,
KeyCountingRecordHandler keyCountingRecordHandler,
ApplicationProperties properties)
{
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- keyCountingRebalanceListener,
keyCountingRecordHandler);
}
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
- private final ConsumerRebalanceListener consumerRebalanceListener;
private final RecordHandler<K, V> handler;
private final Lock lock = new ReentrantLock();
try
{
log.info("{} - Subscribing to topic {}", id, topic);
- consumer.subscribe(Arrays.asList(topic), consumerRebalanceListener);
+ consumer.subscribe(Arrays.asList(topic));
while (true)
{
+++ /dev/null
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements ConsumerRebalanceListener
-{
- private final KeyCountingRecordHandler handler;
- private final String id;
-
- @Override
- public void onPartitionsAssigned(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- log.info("{} - adding partition: {}", id, partition);
- handler.addPartition(partition, new HashMap<>());
- });
- }
-
- @Override
- public void onPartitionsRevoked(Collection<TopicPartition> partitions)
- {
- partitions.forEach(tp ->
- {
- Integer partition = tp.partition();
- log.info("{} - removing partition: {}", id, partition);
- Map<String, Long> removed = handler.removePartition(partition);
- for (String key : removed.keySet())
- {
- log.info(
- "{} - Seen {} messages for partition={}|key={}",
- id,
- removed.get(key),
- partition,
- key);
- }
- });
- }
-}
{
Integer partition = record.partition();
String key = record.key() == null ? "NULL" : record.key().toString();
+
+ if (!seen.containsKey(partition))
+ seen.put(partition, new HashMap<>());
+
Map<String, Long> byKey = seen.get(partition);
if (!byKey.containsKey(key))
byKey.put(key, seenByKey);
}
- public void addPartition(Integer partition, Map<String, Long> statistics)
- {
- seen.put(partition, statistics);
- }
-
- public Map<String, Long> removePartition(Integer partition)
- {
- return seen.remove(partition);
- }
-
public Map<Integer, Map<String, Long>> getSeen()
{
@Autowired
ExecutorService executor;
@Autowired
- KeyCountingRebalanceListener keyCountingRebalanceListener;
- @Autowired
KeyCountingRecordHandler keyCountingRecordHandler;
EndlessConsumer<String, Long> endlessConsumer;
properties.getClientId(),
properties.getTopic(),
kafkaConsumer,
- keyCountingRebalanceListener,
captureOffsetAndExecuteTestHandler);
endlessConsumer.start();