From a2e8fc924e5b472d6b90c42d311514f91ea452f1 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 14 Aug 2022 19:04:47 +0200
Subject: [PATCH] =?utf8?q?Benennung=20vereinheitlicht=20und=20projektunabh?=
 =?utf8?q?=C3=A4ngig=20gemacht?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

---
 .../juplo/kafka/ApplicationConfiguration.java | 24 +++++++++----------
 ...java => ApplicationRebalanceListener.java} | 18 +++++++-------
 ...ler.java => ApplicationRecordHandler.java} |  2 +-
 .../java/de/juplo/kafka/DriverController.java |  8 +++----
 .../java/de/juplo/kafka/EndlessConsumer.java  | 14 +++++------
 .../java/de/juplo/kafka/StateDocument.java    |  3 +--
 ...csRepository.java => StateRepository.java} |  2 +-
 .../juplo/kafka/GenericApplicationTests.java  |  8 +++----
 8 files changed, 40 insertions(+), 39 deletions(-)
 rename src/main/java/de/juplo/kafka/{AdderRebalanceListener.java => ApplicationRebalanceListener.java} (80%)
 rename src/main/java/de/juplo/kafka/{AdderRecordHandler.java => ApplicationRecordHandler.java} (93%)
 rename src/main/java/de/juplo/kafka/{PartitionStatisticsRepository.java => StateRepository.java} (66%)

diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index 973e973..9f54083 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -18,21 +18,21 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public AdderRecordHandler sumRecordHandler()
+  public ApplicationRecordHandler recordHandler()
   {
-    return new AdderRecordHandler();
+    return new ApplicationRecordHandler();
   }
 
   @Bean
-  public AdderRebalanceListener sumRebalanceListener(
-      AdderRecordHandler adderRecordHandler,
-      PartitionStatisticsRepository repository,
+  public ApplicationRebalanceListener rebalanceListener(
+      ApplicationRecordHandler recordHandler,
+      StateRepository stateRepository,
       Consumer<String, String> consumer,
       ApplicationProperties properties)
   {
-    return new AdderRebalanceListener(
-        adderRecordHandler,
-        repository,
+    return new ApplicationRebalanceListener(
+        recordHandler,
+        stateRepository,
         properties.getClientId(),
         properties.getTopic(),
         Clock.systemDefaultZone(),
@@ -44,8 +44,8 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, String> endlessConsumer(
       KafkaConsumer<String, String> kafkaConsumer,
       ExecutorService executor,
-      AdderRebalanceListener adderRebalanceListener,
-      AdderRecordHandler adderRecordHandler,
+      ApplicationRebalanceListener rebalanceListener,
+      ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
   {
     return
@@ -54,8 +54,8 @@ public class ApplicationConfiguration
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
-            adderRebalanceListener,
-            adderRecordHandler);
+            rebalanceListener,
+            recordHandler);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
similarity index 80%
rename from src/main/java/de/juplo/kafka/AdderRebalanceListener.java
rename to src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
index ef595ba..542af2d 100644
--- a/src/main/java/de/juplo/kafka/AdderRebalanceListener.java
+++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
@@ -9,14 +9,15 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
+import java.util.Map;
 
 
 @RequiredArgsConstructor
 @Slf4j
-public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
+public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
 {
-  private final AdderRecordHandler handler;
-  private final PartitionStatisticsRepository repository;
+  private final ApplicationRecordHandler recordHandler;
+  private final StateRepository stateRepository;
   private final String id;
   private final String topic;
   private final Clock clock;
@@ -33,7 +34,7 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
     {
       Integer partition = tp.partition();
       StateDocument document =
-          repository
+          stateRepository
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
       log.info("{} - adding partition: {}, offset={}", id, partition, document.offset);
@@ -43,7 +44,7 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
         // Otherwise: Use initial offset, generated by Kafka
         consumer.seek(tp, document.offset);
       }
-      handler.addPartition(partition, document.state);
+      recordHandler.addPartition(partition, document.state);
     });
   }
 
@@ -61,7 +62,8 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
           offset);
       if (commitsEnabled)
       {
-        repository.save(new StateDocument(partition, handler.removePartition(partition), offset));
+        Map<String, Long> removed = recordHandler.removePartition(partition);
+        stateRepository.save(new StateDocument(partition, removed, offset));
       }
       else
       {
@@ -83,10 +85,10 @@ public class AdderRebalanceListener implements PollIntervalAwareConsumerRebalanc
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
+      recordHandler.getState().forEach((partiton, adder) -> stateRepository.save(
           new StateDocument(
               partiton,
-              sumBusinessLogic.getState(),
+              adder.getState(),
               consumer.position(new TopicPartition(topic, partiton)))));
       lastCommit = clock.instant();
     }
diff --git a/src/main/java/de/juplo/kafka/AdderRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
similarity index 93%
rename from src/main/java/de/juplo/kafka/AdderRecordHandler.java
rename to src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
index ecd47bc..d0d385c 100644
--- a/src/main/java/de/juplo/kafka/AdderRecordHandler.java
+++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
@@ -8,7 +8,7 @@ import java.util.Map;
 
 
 @Slf4j
-public class AdderRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, String>
 {
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java
index 0870f19..d389271 100644
--- a/src/main/java/de/juplo/kafka/DriverController.java
+++ b/src/main/java/de/juplo/kafka/DriverController.java
@@ -16,7 +16,7 @@ import java.util.stream.Collectors;
 public class DriverController
 {
   private final EndlessConsumer consumer;
-  private final AdderRecordHandler adderRecordHandler;
+  private final ApplicationRecordHandler recordHandler;
 
 
   @PostMapping("start")
@@ -36,7 +36,7 @@ public class DriverController
   public Map<Integer, Map<String, Long>> state()
   {
     return
-        adderRecordHandler
+        recordHandler
             .getState()
             .entrySet()
             .stream()
@@ -48,9 +48,9 @@ public class DriverController
   @GetMapping("state/{user}")
   public ResponseEntity<Long> seen(@PathVariable String user)
   {
-    for (AdderBusinessLogic adderBusinessLogic : adderRecordHandler.getState().values())
+    for (AdderBusinessLogic adder : recordHandler.getState().values())
     {
-      Optional<Long> sum = adderBusinessLogic.getSum(user);
+      Optional<Long> sum = adder.getSum(user);
       if (sum.isPresent())
         return ResponseEntity.ok(sum.get());
     }
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index cfba6df..58374f4 100644
--- a/src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java
@@ -25,8 +25,8 @@ public class EndlessConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
-  private final PollIntervalAwareConsumerRebalanceListener pollIntervalAwareRebalanceListener;
-  private final RecordHandler<K, V> handler;
+  private final PollIntervalAwareConsumerRebalanceListener rebalanceListener;
+  private final RecordHandler<K, V> recordHandler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -42,8 +42,8 @@ public class EndlessConsumer<K, V> implements Runnable
     try
     {
       log.info("{} - Subscribing to topic {}", id, topic);
-      pollIntervalAwareRebalanceListener.enableCommits();
-      consumer.subscribe(Arrays.asList(topic), pollIntervalAwareRebalanceListener);
+      rebalanceListener.enableCommits();
+      consumer.subscribe(Arrays.asList(topic), rebalanceListener);
 
       while (true)
       {
@@ -64,12 +64,12 @@ public class EndlessConsumer<K, V> implements Runnable
               record.value()
           );
 
-          handler.accept(record);
+          recordHandler.accept(record);
 
           consumed++;
         }
 
-        pollIntervalAwareRebalanceListener.beforeNextPoll();
+        rebalanceListener.beforeNextPoll();
       }
     }
     catch(WakeupException e)
@@ -93,7 +93,7 @@ public class EndlessConsumer<K, V> implements Runnable
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}, disabling commits", id, e.toString(), e);
-      pollIntervalAwareRebalanceListener.disableCommits();
+      rebalanceListener.disableCommits();
       shutdown(e);
     }
     finally
diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java
index 2583c8e..0540e3f 100644
--- a/src/main/java/de/juplo/kafka/StateDocument.java
+++ b/src/main/java/de/juplo/kafka/StateDocument.java
@@ -5,11 +5,10 @@ import org.springframework.data.annotation.Id;
 import org.springframework.data.mongodb.core.mapping.Document;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 
-@Document(collection = "statistics")
+@Document(collection = "state")
 @ToString
 public class StateDocument
 {
diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java
similarity index 66%
rename from src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
rename to src/main/java/de/juplo/kafka/StateRepository.java
index 9e26410..3129535 100644
--- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java
+++ b/src/main/java/de/juplo/kafka/StateRepository.java
@@ -5,7 +5,7 @@ import org.springframework.data.mongodb.repository.MongoRepository;
 import java.util.Optional;
 
 
-public interface PartitionStatisticsRepository extends MongoRepository<StateDocument, String>
+public interface StateRepository extends MongoRepository<StateDocument, String>
 {
   public Optional<StateDocument> findById(String partition);
 }
diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
index 711a44a..9a6f812 100644
--- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
@@ -60,7 +60,7 @@ abstract class GenericApplicationTests<K, V>
 	@Autowired
 	ExecutorService executor;
 	@Autowired
-	PartitionStatisticsRepository partitionStatisticsRepository;
+	StateRepository stateRepository;
 	@Autowired
 	PollIntervalAwareConsumerRebalanceListener rebalanceListener;
 	@Autowired
@@ -233,11 +233,11 @@ abstract class GenericApplicationTests<K, V>
 			log.info("New position for {}: {}", tp, offset);
 			Integer partition = tp.partition();
 			StateDocument document =
-					partitionStatisticsRepository
+					stateRepository
 							.findById(partition.toString())
 							.orElse(new StateDocument(partition));
 			document.offset = offset;
-			partitionStatisticsRepository.save(document);
+			stateRepository.save(document);
 		});
 		offsetConsumer.unsubscribe();
 	}
@@ -247,7 +247,7 @@ abstract class GenericApplicationTests<K, V>
 		partitions().forEach(tp ->
 		{
 			String partition = Integer.toString(tp.partition());
-			Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
+			Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
 			consumer.accept(tp, offset.orElse(0l));
 		});
 	}
-- 
2.20.1