Konfig-Parameter zum künstlichen Verzögern der Verabeitung eingebaut
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 596f3da..0f5b982 100644 (file)
@@ -4,8 +4,10 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 
 @RequiredArgsConstructor
@@ -13,6 +15,7 @@ import java.util.Map;
 public class ApplicationRecordHandler implements RecordHandler<String, String>
 {
   private final AdderResults results;
+  private final Optional<Duration> throttle;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
@@ -29,10 +32,23 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
       AdderResult result = state.get(partition).calculate(user);
       log.info("New result for {}: {}", user, result);
       results.addResults(partition, user, result);
-      return;
+    }
+    else
+    {
+      state.get(partition).addToSum(user, Integer.parseInt(message));
     }
 
-    state.get(partition).addToSum(user, Integer.parseInt(message));
+    if (throttle.isPresent())
+    {
+      try
+      {
+        Thread.sleep(throttle.get().toMillis());
+      }
+      catch (InterruptedException e)
+      {
+        log.warn("Intrerrupted while throttling: {}", e);
+      }
+    }
   }
 
   protected void addPartition(Integer partition, Map<String, AdderResult> state)