`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 93e1297..f4d3671 100644 (file)
@@ -1,33 +1,64 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 
+@RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler
 {
+  private final AdderResults results;
+  private final Optional<Duration> throttle;
+  private final String id;
+
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
   @Override
-  public void accept(ConsumerRecord<String, String> record)
+  public void addNumber(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageAddNumber message)
+  {
+    state.get(partition).addToSum(user, message.getNext());
+    throttle();
+  }
+
+  @Override
+  public void calculateSum(
+      String topic,
+      Integer partition,
+      Long offset,
+      String user,
+      MessageCalculateSum message)
   {
-    Integer partition = record.partition();
-    String user = record.key();
-    String message = record.value();
+    AdderResult result = state.get(partition).calculate(user);
+    log.info("{} - New result for {}: {}", id, user, result);
+    results.addResults(partition, user, result);
+    throttle();
+  }
 
-    if (message.equals("CALCULATE"))
+  private void throttle()
+  {
+    if (throttle.isPresent())
     {
-      AdderResult result = state.get(partition).calculate(user);
-      log.info("New result for {}: {}", user, result);
-      return;
+      try
+      {
+        Thread.sleep(throttle.get().toMillis());
+      }
+      catch (InterruptedException e)
+      {
+        log.warn("{} - Intrerrupted while throttling: {}", id, e);
+      }
     }
-
-    state.get(partition).addToSum(user, Integer.parseInt(message));
   }
 
   protected void addPartition(Integer partition, Map<String, AdderResult> state)
@@ -45,4 +76,9 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
   {
     return state;
   }
+
+  public AdderBusinessLogic getState(Integer partition)
+  {
+    return state.get(partition);
+  }
 }