Benennung vereinheitlicht und projektunabhängig gemacht rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 08:33:09 +0000 (10:33 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 21 Aug 2022 09:07:25 +0000 (11:07 +0200)
* Merge branch 'counting-consumer' into rebalance-listener
* Außerdem die dort nicht vorhandene Klasse `KeyCountingRebalanceListener`
  entsprechend umbenannt.

1  2 
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/test/java/de/juplo/kafka/ApplicationTests.java

@@@ -17,27 -17,16 +17,27 @@@ import java.util.concurrent.Executors
  public class ApplicationConfiguration
  {
    @Bean
-   public KeyCountingRecordHandler keyCountingRecordHandler()
+   public ApplicationRecordHandler recordHandler()
    {
-     return new KeyCountingRecordHandler();
+     return new ApplicationRecordHandler();
    }
  
-   public KeyCountingRebalanceListener keyCountingRebalanceListener(
-       KeyCountingRecordHandler keyCountingRecordHandler,
 +  @Bean
-     return new KeyCountingRebalanceListener(
-         keyCountingRecordHandler,
++  public ApplicationRebalanceListener rebalanceListener(
++      ApplicationRecordHandler recordHandler,
 +      ApplicationProperties properties)
 +  {
++    return new ApplicationRebalanceListener(
++        recordHandler,
 +        properties.getClientId());
 +  }
 +
    @Bean
    public EndlessConsumer<String, Long> endlessConsumer(
        KafkaConsumer<String, Long> kafkaConsumer,
        ExecutorService executor,
-       KeyCountingRebalanceListener keyCountingRebalanceListener,
-       KeyCountingRecordHandler keyCountingRecordHandler,
++      ApplicationRebalanceListener rebalanceListener,
+       ApplicationRecordHandler recordHandler,
        ApplicationProperties properties)
    {
      return
@@@ -46,8 -35,7 +46,8 @@@
              properties.getClientId(),
              properties.getTopic(),
              kafkaConsumer,
-             keyCountingRebalanceListener,
-             keyCountingRecordHandler);
++            rebalanceListener,
+             recordHandler);
    }
  
    @Bean
index 0000000,0000000..0dcadce
new file mode 100644 (file)
--- /dev/null
--- /dev/null
@@@ -1,0 -1,0 +1,50 @@@
++package de.juplo.kafka;
++
++import lombok.RequiredArgsConstructor;
++import lombok.extern.slf4j.Slf4j;
++import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
++import org.apache.kafka.common.TopicPartition;
++
++import java.util.Collection;
++import java.util.HashMap;
++import java.util.Map;
++
++
++@RequiredArgsConstructor
++@Slf4j
++public class ApplicationRebalanceListener implements ConsumerRebalanceListener
++{
++  private final ApplicationRecordHandler recordHandler;
++  private final String id;
++
++  @Override
++  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
++  {
++    partitions.forEach(tp ->
++    {
++      Integer partition = tp.partition();
++      log.info("{} - adding partition: {}", id, partition);
++      recordHandler.addPartition(partition, new HashMap<>());
++    });
++  }
++
++  @Override
++  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
++  {
++    partitions.forEach(tp ->
++    {
++      Integer partition = tp.partition();
++      log.info("{} - removing partition: {}", id, partition);
++      Map<String, Long> removed = recordHandler.removePartition(partition);
++      for (String key : removed.keySet())
++      {
++        log.info(
++            "{} - Seen {} messages for partition={}|key={}",
++            id,
++            removed.get(key),
++            partition,
++            key);
++      }
++    });
++  }
++}
index 0000000,3492c0d..dfbf82e
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,40 +1,46 @@@
 -
 -    if (!state.containsKey(partition))
 -      state.put(partition, new HashMap<>());
 -
+ package de.juplo.kafka;
+ import lombok.extern.slf4j.Slf4j;
+ import org.apache.kafka.clients.consumer.ConsumerRecord;
+ import java.util.HashMap;
+ import java.util.Map;
+ @Slf4j
+ public class ApplicationRecordHandler implements RecordHandler<String, Long>
+ {
+   private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+   @Override
+   public void accept(ConsumerRecord<String, Long> record)
+   {
+     Integer partition = record.partition();
+     String key = record.key() == null ? "NULL" : record.key().toString();
+     Map<String, Long> byKey = state.get(partition);
+     if (!byKey.containsKey(key))
+       byKey.put(key, 0l);
+     long seenByKey = byKey.get(key);
+     seenByKey++;
+     byKey.put(key, seenByKey);
+   }
++  public void addPartition(Integer partition, Map<String, Long> statistics)
++  {
++    state.put(partition, statistics);
++  }
++
++  public Map<String, Long> removePartition(Integer partition)
++  {
++    return state.remove(partition);
++  }
++
+   public Map<Integer, Map<String, Long>> getState()
+   {
+     return state;
+   }
+ }
@@@ -65,9 -65,7 +65,9 @@@ class ApplicationTest
        @Autowired
        ExecutorService executor;
        @Autowired
-       KeyCountingRebalanceListener keyCountingRebalanceListener;
++      ApplicationRebalanceListener rebalanceListener;
 +      @Autowired
-       KeyCountingRecordHandler keyCountingRecordHandler;
+       ApplicationRecordHandler recordHandler;
  
        EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                kafkaConsumer,
-                                               keyCountingRebalanceListener,
++                                              rebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
  
                endlessConsumer.start();