Benennung vereinheitlicht und projektunabhängig gemacht
authorKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 17:04:47 +0000 (19:04 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 14 Aug 2022 18:50:16 +0000 (20:50 +0200)
src/main/java/de/juplo/kafka/AdderRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/AdderRecordHandler.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java [deleted file]
src/main/java/de/juplo/kafka/StateDocument.java
src/main/java/de/juplo/kafka/StateRepository.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/GenericApplicationTests.java

diff --git a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java b/src/main/java/de/juplo/kafka/AdderRebalanceListener.java
deleted file mode 100644 (file)
index ef595ba..0000000
+++ /dev/null
@@ -1,106 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
-  private final AdderRecordHandler handler;
-  private final PartitionStatisticsRepository repository;
-  private final String id;
-  private final String topic;
-  private final Clock clock;
-  private final Duration commitInterval;
-  private final Consumer<String, String> consumer;
-
-  private Instant lastCommit = Instant.EPOCH;
-  private boolean commitsEnabled = true;
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      StateDocument document =
-          repository
-              .findById(Integer.toString(partition))
-              .orElse(new StateDocument(partition));
-      log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
-      if (document.offset >= 0)
-      {
-        // Only seek, if a stored offset was found
-        // Otherwise: Use initial offset, generated by Kafka
-        consumer.seek(tp, document.offset);
-      }
-      handler.addPartition(partition, document.state);
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      Long offset = consumer.position(tp);
-      log.info(
-          "{} - removing partition: {}, offset of next message {})",
-          id,
-          partition,
-          offset);
-      if (commitsEnabled)
-      {
-        repository.save(new StateDocument(partition, handler.removePartition(partition), offset));
-      }
-      else
-      {
-        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
-      }
-    });
-  }
-
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (!commitsEnabled)
-    {
-      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
-      return;
-    }
-
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
-          new StateDocument(
-              partiton,
-              sumBusinessLogic.getState(),
-              consumer.position(new TopicPartition(topic, partiton)))));
-      lastCommit = clock.instant();
-    }
-  }
-
-  @Override
-  public void enableCommits()
-  {
-    commitsEnabled = true;
-  }
-
-  @Override
-  public void disableCommits()
-  {
-    commitsEnabled = false;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/AdderRecordHandler.java b/src/main/java/de/juplo/kafka/AdderRecordHandler.java
deleted file mode 100644 (file)
index ecd47bc..0000000
+++ /dev/null
@@ -1,54 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-@Slf4j
-public class AdderRecordHandler implements RecordHandler<String, String>
-{
-  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
-
-
-  @Override
-  public void accept(ConsumerRecord<String, String> record)
-  {
-    Integer partition = record.partition();
-    String user = record.key();
-    String message = record.value();
-    switch (message)
-    {
-      case "START":
-        state.get(partition).startSum(user);
-        break;
-
-      case "END":
-        Long result = state.get(partition).endSum(user);
-        log.info("New result for {}: {}", user, result);
-        break;
-
-      default:
-        state.get(partition).addToSum(user, Integer.parseInt(message));
-        break;
-    }
-  }
-
-  protected void addPartition(Integer partition, Map<String, Long> state)
-  {
-    this.state.put(partition, new AdderBusinessLogic(state));
-  }
-
-  protected Map<String, Long> removePartition(Integer partition)
-  {
-    return this.state.remove(partition).getState();
-  }
-
-
-  public Map<Integer, AdderBusinessLogic> getState()
-  {
-    return state;
-  }
-}
index 973e973..9f54083 100644 (file)
@@ -18,21 +18,21 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public AdderRecordHandler sumRecordHandler()
+  public ApplicationRecordHandler recordHandler()
   {
-    return new AdderRecordHandler();
+    return new ApplicationRecordHandler();
   }
 
   @Bean
-  public AdderRebalanceListener sumRebalanceListener(
-      AdderRecordHandler adderRecordHandler,
-      PartitionStatisticsRepository repository,
+  public ApplicationRebalanceListener rebalanceListener(
+      ApplicationRecordHandler recordHandler,
+      StateRepository stateRepository,
       Consumer<String, String> consumer,
       ApplicationProperties properties)
   {
-    return new AdderRebalanceListener(
-        adderRecordHandler,
-        repository,
+    return new ApplicationRebalanceListener(
+        recordHandler,
+        stateRepository,
         properties.getClientId(),
         properties.getTopic(),
         Clock.systemDefaultZone(),
@@ -44,8 +44,8 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, String> endlessConsumer(
       KafkaConsumer<String, String> kafkaConsumer,
       ExecutorService executor,
-      AdderRebalanceListener adderRebalanceListener,
-      AdderRecordHandler adderRecordHandler,
+      ApplicationRebalanceListener rebalanceListener,
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -54,8 +54,8 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            adderRebalanceListener,
-            adderRecordHandler);
+            rebalanceListener,
+            recordHandler);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
new file mode 100644 (file)
index 0000000..542af2d
--- /dev/null
@@ -0,0 +1,108 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.Map;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+{
+  private final ApplicationRecordHandler recordHandler;
+  private final StateRepository stateRepository;
+  private final String id;
+  private final String topic;
+  private final Clock clock;
+  private final Duration commitInterval;
+  private final Consumer<String, String> consumer;
+
+  private Instant lastCommit = Instant.EPOCH;
+  private boolean commitsEnabled = true;
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      StateDocument document =
+          stateRepository
+              .findById(Integer.toString(partition))
+              .orElse(new StateDocument(partition));
+      log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
+      if (document.offset >= 0)
+      {
+        // Only seek, if a stored offset was found
+        // Otherwise: Use initial offset, generated by Kafka
+        consumer.seek(tp, document.offset);
+      }
+      recordHandler.addPartition(partition, document.state);
+    });
+  }
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info(
+          "{} - removing partition: {}, offset of next message {})",
+          id,
+          partition,
+          offset);
+      if (commitsEnabled)
+      {
+        Map<String, Long> removed = recordHandler.removePartition(partition);
+        stateRepository.save(new StateDocument(partition, removed, offset));
+      }
+      else
+      {
+        log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      }
+    });
+  }
+
+
+  @Override
+  public void beforeNextPoll()
+  {
+    if (!commitsEnabled)
+    {
+      log.info("Offset commits are disabled! Last commit: {}", lastCommit);
+      return;
+    }
+
+    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
+    {
+      log.debug("Storing data and offsets, last commit: {}", lastCommit);
+      recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
+          new StateDocument(
+              partiton,
+              adder.getState(),
+              consumer.position(new TopicPartition(topic, partiton)))));
+      lastCommit = clock.instant();
+    }
+  }
+
+  @Override
+  public void enableCommits()
+  {
+    commitsEnabled = true;
+  }
+
+  @Override
+  public void disableCommits()
+  {
+    commitsEnabled = false;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
new file mode 100644 (file)
index 0000000..d0d385c
--- /dev/null
@@ -0,0 +1,54 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@Slf4j
+public class ApplicationRecordHandler implements RecordHandler<String, String>
+{
+  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+
+
+  @Override
+  public void accept(ConsumerRecord<String, String> record)
+  {
+    Integer partition = record.partition();
+    String user = record.key();
+    String message = record.value();
+    switch (message)
+    {
+      case "START":
+        state.get(partition).startSum(user);
+        break;
+
+      case "END":
+        Long result = state.get(partition).endSum(user);
+        log.info("New result for {}: {}", user, result);
+        break;
+
+      default:
+        state.get(partition).addToSum(user, Integer.parseInt(message));
+        break;
+    }
+  }
+
+  protected void addPartition(Integer partition, Map<String, Long> state)
+  {
+    this.state.put(partition, new AdderBusinessLogic(state));
+  }
+
+  protected Map<String, Long> removePartition(Integer partition)
+  {
+    return this.state.remove(partition).getState();
+  }
+
+
+  public Map<Integer, AdderBusinessLogic> getState()
+  {
+    return state;
+  }
+}
index 0870f19..d389271 100644 (file)
@@ -16,7 +16,7 @@ import java.util.stream.Collectors;
 public class DriverController
 {
   private final EndlessConsumer consumer;
-  private final AdderRecordHandler adderRecordHandler;
+  private final ApplicationRecordHandler recordHandler;
 
 
   @PostMapping("start")
@@ -36,7 +36,7 @@ public class DriverController
   public Map<Integer, Map<String, Long>> state()
   {
     return
-        adderRecordHandler
+        recordHandler
             .getState()
             .entrySet()
             .stream()
@@ -48,9 +48,9 @@ public class DriverController
   @GetMapping("state/{user}")
   public ResponseEntity<Long> seen(@PathVariable String user)
   {
-    for (AdderBusinessLogic adderBusinessLogic : adderRecordHandler.getState().values())
+    for (AdderBusinessLogic adder : recordHandler.getState().values())
     {
-      Optional<Long> sum = adderBusinessLogic.getSum(user);
+      Optional<Long> sum = adder.getSum(user);
       if (sum.isPresent())
         return ResponseEntity.ok(sum.get());
     }
index cfba6df..58374f4 100644 (file)
@@ -25,8 +25,8 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
-  private final RecordHandler<K, V> handler;
+  private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+  private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -42,8 +42,8 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      pollIntervalAwareRebalanceListener.enableCommits();
-      consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+      rebalanceListener.enableCommits();
+      consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
       {
@@ -64,12 +64,12 @@ public class EndlessConsumer<K, V> implements Runnable
               record.value()
           );
 
-          handler.accept(record);
+          recordHandler.accept(record);
 
           consumed++;
         }
 
-        pollIntervalAwareRebalanceListener.beforeNextPoll();
+        rebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
@@ -93,7 +93,7 @@ public class EndlessConsumer<K, V> implements Runnable
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
-      pollIntervalAwareRebalanceListener.disableCommits();
+      rebalanceListener.disableCommits();
       shutdown(e);
     }
     finally
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
deleted file mode 100644 (file)
index 9e26410..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface PartitionStatisticsRepository extends MongoRepository<StateDocument, String>
-{
-  public Optional<StateDocument> findById(String partition);
-}
index 2583c8e..0540e3f 100644 (file)
@@ -5,11 +5,10 @@ import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 
-@Document(collection = "statistics")
+@Document(collection = "state")
 @ToString
 public class StateDocument
 {
diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java
new file mode 100644 (file)
index 0000000..3129535
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+import java.util.Optional;
+
+
+public interface StateRepository extends MongoRepository<StateDocument, String>
+{
+  public Optional<StateDocument> findById(String partition);
+}
index 711a44a..9a6f812 100644 (file)
@@ -60,7 +60,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        ExecutorService executor;
        @Autowired
-       PartitionStatisticsRepository partitionStatisticsRepository;
+       StateRepository stateRepository;
        @Autowired
        PollIntervalAwareConsumerRebalanceListener rebalanceListener;
        @Autowired
@@ -233,11 +233,11 @@ abstract class GenericApplicationTests<K, V>
                        log.info("New position for {}: {}", tp, offset);
                        Integer partition = tp.partition();
                        StateDocument document =
-                                       partitionStatisticsRepository
+                                       stateRepository
                                                        .findById(partition.toString())
                                                        .orElse(new StateDocument(partition));
                        document.offset = offset;
-                       partitionStatisticsRepository.save(document);
+                       stateRepository.save(document);
                });
                offsetConsumer.unsubscribe();
        }
@@ -247,7 +247,7 @@ abstract class GenericApplicationTests<K, V>
                partitions().forEach(tp ->
                {
                        String partition = Integer.toString(tp.partition());
-                       Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+                       Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
                        consumer.accept(tp, offset.orElse(0l));
                });
        }