Benennung vereinheitlicht und projektunabhängig gemacht
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 17:11:25 +0000 (19:11 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 17:33:27 +0000 (19:33 +0200)
12 files changed:
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java [deleted file]
src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java [deleted file]
src/main/java/de/juplo/kafka/StateDocument.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StateRepository.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/StatisticsDocument.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationTests.java

index 3925fcb..a9d9b15 100644 (file)
@@ -19,21 +19,21 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public KeyCountingRecordHandler messageCountingRecordHandler()
+  public ApplicationRecordHandler recordHandler()
   {
-    return new KeyCountingRecordHandler();
+    return new ApplicationRecordHandler();
   }
 
   @Bean
-  public KeyCountingRebalanceListener wordcountRebalanceListener(
-      KeyCountingRecordHandler keyCountingRecordHandler,
-      PartitionStatisticsRepository repository,
+  public ApplicationRebalanceListener rebalanceListener(
+      ApplicationRecordHandler recordHandler,
+      StateRepository stateRepository,
       Consumer<String, Long> consumer,
       ApplicationProperties properties)
   {
-    return new KeyCountingRebalanceListener(
-        keyCountingRecordHandler,
-        repository,
+    return new ApplicationRebalanceListener(
+        recordHandler,
+        stateRepository,
         properties.getClientId(),
         properties.getTopic(),
         Clock.systemDefaultZone(),
@@ -45,8 +45,8 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, Long> endlessConsumer(
       KafkaConsumer<String, Long> kafkaConsumer,
       ExecutorService executor,
-      KeyCountingRebalanceListener keyCountingRebalanceListener,
-      KeyCountingRecordHandler keyCountingRecordHandler,
+      ApplicationRebalanceListener rebalanceListener,
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -55,8 +55,8 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            keyCountingRebalanceListener,
-            keyCountingRecordHandler);
+            rebalanceListener,
+            recordHandler);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
new file mode 100644 (file)
index 0000000..2fccb4f
--- /dev/null
@@ -0,0 +1,83 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+  private final ApplicationRecordHandler recordHandler;
+  private final StateRepository stateRepository;
+  private final String id;
+  private final String topic;
+  private final Clock clock;
+  private final Duration commitInterval;
+  private final Consumer<String, Long> consumer;
+
+  private Instant lastCommit = Instant.EPOCH;
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+      StateDocument document =
+          stateRepository
+              .findById(Integer.toString(partition))
+              .orElse(new StateDocument(partition));
+      if (document.offset >= 0)
+      {
+        // Only seek, if a stored offset was found
+        // Otherwise: Use initial offset, generated by Kafka
+        consumer.seek(tp, document.offset);
+      }
+      recordHandler.addPartition(partition, document.state);
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long newOffset = consumer.position(tp);
+      log.info(
+          "{} - removing partition: {}, offset of next message {})",
+          id,
+          partition,
+          newOffset);
+      Map<String, Long> removed = recordHandler.removePartition(partition);
+      stateRepository.save(new StateDocument(partition, removed, consumer.position(tp)));
+    });
+  }
+
+
+  @Override
+  public void beforeNextPoll()
+  {
+    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+    {
+      log.debug("Storing data and offsets, last commit: {}", lastCommit);
+      recordHandler.getState().forEach((partiton, state) -> stateRepository.save(
+          new StateDocument(
+              partiton,
+              state,
+              consumer.position(new TopicPartition(topic, partiton)))));
+      lastCommit = clock.instant();
+    }
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
new file mode 100644 (file)
index 0000000..c2c2657
--- /dev/null
@@ -0,0 +1,46 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, Long>
+{
+  private final Map<Integer, Map<String, Long>> state = new HashMap<>();
+
+
+  @Override
+  public void accept(ConsumerRecord<String, Long> record)
+  {
+    Integer partition = record.partition();
+    String key = record.key() == null ? "NULL" : record.key().toString();
+    Map<String, Long> byKey = state.get(partition);
+
+    if (!byKey.containsKey(key))
+      byKey.put(key, 0l);
+
+    long seenByKey = byKey.get(key);
+    seenByKey++;
+    byKey.put(key, seenByKey);
+  }
+
+  protected void addPartition(Integer partition, Map<String, Long> state)
+  {
+    this.state.put(partition, state);
+  }
+
+  protected Map<String, Long> removePartition(Integer partition)
+  {
+    return this.state.remove(partition);
+  }
+
+
+  public Map<Integer, Map<String, Long>> getState()
+  {
+    return state;
+  }
+}
index f6ff47f..09fb762 100644 (file)
@@ -13,7 +13,7 @@ import java.util.concurrent.ExecutionException;
 public class DriverController
 {
   private final EndlessConsumer consumer;
-  private final KeyCountingRecordHandler keyCountingRecordHandler;
+  private final ApplicationRecordHandler recordHandler;
 
 
   @PostMapping("start")
@@ -29,10 +29,10 @@ public class DriverController
   }
 
 
-  @GetMapping("seen")
-  public Map<Integer, Map<String, Long>> seen()
+  @GetMapping("state")
+  public Map<Integer, Map<String, Long>> state()
   {
-    return keyCountingRecordHandler.getSeen();
+    return recordHandler.getState();
   }
 
 
index 58557f2..17778be 100644 (file)
@@ -25,8 +25,8 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
-  private final RecordHandler<K, V> handler;
+  private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+  private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -42,7 +42,7 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+      consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
       {
@@ -63,12 +63,12 @@ public class EndlessConsumer<K, V> implements Runnable
               record.value()
           );
 
-          handler.accept(record);
+          recordHandler.accept(record);
 
           consumed++;
         }
 
-        pollIntervalAwareRebalanceListener.beforeNextPoll();
+        rebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
deleted file mode 100644 (file)
index 4a2c036..0000000
+++ /dev/null
@@ -1,83 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
-  private final KeyCountingRecordHandler handler;
-  private final PartitionStatisticsRepository repository;
-  private final String id;
-  private final String topic;
-  private final Clock clock;
-  private final Duration commitInterval;
-  private final Consumer<String, Long> consumer;
-
-  private Instant lastCommit = Instant.EPOCH;
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long offset = consumer.position(tp);
-      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-      StatisticsDocument document =
-          repository
-              .findById(Integer.toString(partition))
-              .orElse(new StatisticsDocument(partition));
-      if (document.offset >= 0)
-      {
-        // Only seek, if a stored offset was found
-        // Otherwise: Use initial offset, generated by Kafka
-        consumer.seek(tp, document.offset);
-      }
-      handler.addPartition(partition, document.statistics);
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long newOffset = consumer.position(tp);
-      log.info(
-          "{} - removing partition: {}, offset of next message {})",
-          id,
-          partition,
-          newOffset);
-      Map<String, Long> removed = handler.removePartition(partition);
-      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
-    });
-  }
-
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      handler.getSeen().forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
-              partiton,
-              statistics,
-              consumer.position(new TopicPartition(topic, partiton)))));
-      lastCommit = clock.instant();
-    }
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java b/src/main/java/de/juplo/kafka/KeyCountingRecordHandler.java
deleted file mode 100644 (file)
index 099dcf7..0000000
+++ /dev/null
@@ -1,46 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class KeyCountingRecordHandler implements RecordHandler<String, Long>
-{
-  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
-
-
-  @Override
-  public void accept(ConsumerRecord<String, Long> record)
-  {
-    Integer partition = record.partition();
-    String key = record.key() == null ? "NULL" : record.key().toString();
-    Map<String, Long> byKey = seen.get(partition);
-
-    if (!byKey.containsKey(key))
-      byKey.put(key, 0l);
-
-    long seenByKey = byKey.get(key);
-    seenByKey++;
-    byKey.put(key, seenByKey);
-  }
-
-  public void addPartition(Integer partition, Map<String, Long> statistics)
-  {
-    seen.put(partition, statistics);
-  }
-
-  public Map<String, Long> removePartition(Integer partition)
-  {
-    return seen.remove(partition);
-  }
-
-
-  public Map<Integer, Map<String, Long>> getSeen()
-  {
-    return seen;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
deleted file mode 100644 (file)
index 0ccf3cd..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StatisticsDocument, String>
-{
-  public Optional<StatisticsDocument> findById(String partition);
-}
diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java
new file mode 100644 (file)
index 0000000..bb1c701
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka;
+
+import lombok.ToString;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Document(collection = "state")
+@ToString
+public class StateDocument
+{
+  @Id
+  public String id;
+  public long offset = -1l;
+  public Map<String, Long> state;
+
+  public StateDocument()
+  {
+  }
+
+  public StateDocument(Integer partition)
+  {
+    this.id = Integer.toString(partition);
+    this.state = new HashMap<>();
+  }
+
+  public StateDocument(Integer partition, Map<String, Long> state, long offset)
+  {
+    this.id = Integer.toString(partition);
+    this.state = state;
+    this.offset = offset;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java
new file mode 100644 (file)
index 0000000..3129535
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+  public Optional<StateDocument> findById(String partition);
+}
diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java
deleted file mode 100644 (file)
index 1244f45..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Document(collection = "statistics")
-@ToString
-public class StatisticsDocument
-{
-  @Id
-  public String id;
-  public long offset = -1l;
-  public Map<String, Long> statistics;
-
-  public StatisticsDocument()
-  {
-  }
-
-  public StatisticsDocument(Integer partition)
-  {
-    this.id = Integer.toString(partition);
-    this.statistics = new HashMap<>();
-  }
-
-  public StatisticsDocument(Integer partition, Map<String, Long> statistics, long offset)
-  {
-    this.id = Integer.toString(partition);
-    this.statistics = statistics;
-    this.offset = offset;
-  }
-}
index fc5d4c9..1f18e59 100644 (file)
@@ -64,17 +64,15 @@ class ApplicationTests
        @Autowired
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
        @Autowired
-       PartitionStatisticsRepository partitionStatisticsRepository;
-       @Autowired
        ApplicationProperties properties;
        @Autowired
        ExecutorService executor;
        @Autowired
-       PartitionStatisticsRepository repository;
+  StateRepository stateRepository;
        @Autowired
-       KeyCountingRebalanceListener keyCountingRebalanceListener;
+  ApplicationRebalanceListener rebalanceListener;
        @Autowired
-       KeyCountingRecordHandler keyCountingRecordHandler;
+  ApplicationRecordHandler recordHandler;
 
        EndlessConsumer<String, Long> endlessConsumer;
        Map<TopicPartition, Long> oldOffsets;
@@ -196,12 +194,12 @@ class ApplicationTests
                        Long offset = offsetConsumer.position(tp);
                        log.info("New position for {}: {}", tp, offset);
                        Integer partition = tp.partition();
-                       StatisticsDocument document =
-                                       partitionStatisticsRepository
+                       StateDocument document =
+                                       stateRepository
                                                        .findById(partition.toString())
-                                                       .orElse(new StatisticsDocument(partition));
+                                                       .orElse(new StateDocument(partition));
                        document.offset = offset;
-                       partitionStatisticsRepository.save(document);
+                       stateRepository.save(document);
                });
                offsetConsumer.unsubscribe();
        }
@@ -211,7 +209,7 @@ class ApplicationTests
                partitions().forEach(tp ->
                {
                        String partition = Integer.toString(tp.partition());
-                       Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+                       Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
                        consumer.accept(tp, offset.orElse(0l));
                });
                }
@@ -283,7 +281,7 @@ class ApplicationTests
                });
 
                TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
-                               new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
+                               new TestRecordHandler<String, Long>(recordHandler) {
                                        @Override
                                        public void onNewRecord(ConsumerRecord<String, Long> record)
                                        {
@@ -300,7 +298,7 @@ class ApplicationTests
                                                properties.getClientId(),
                                                properties.getTopic(),
                                                kafkaConsumer,
-                                               keyCountingRebalanceListener,
+            rebalanceListener,
                                                captureOffsetAndExecuteTestHandler);
 
                endlessConsumer.start();