Code an die Version aus 'sumup-adder--springified' angepasst
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 0f5b982..2829157 100644 (file)
@@ -12,30 +12,49 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, String>
+public class ApplicationRecordHandler implements RecordHandler<String, Message>
 {
   private final AdderResults results;
   private final Optional<Duration> throttle;
+  private final String id;
 
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
+  public void addNumber(
+      Integer partition,
+      String user,
+      MessageAddNumber message)
+  {
+    state.get(partition).addToSum(user, message.getNext());
+  }
+
+  public void calculateSum(
+      Integer partition,
+      String user,
+      MessageCalculateSum message)
+  {
+    AdderResult result = state.get(partition).calculate(user);
+    log.info("{} - New result for {}: {}", id, user, result);
+    results.addResults(partition, user, result);
+  }
+
   @Override
-  public void accept(ConsumerRecord<String, String> record)
+  public void accept(ConsumerRecord<String, Message> record)
   {
     Integer partition = record.partition();
     String user = record.key();
-    String message = record.value();
+    Message message = record.value();
 
-    if (message.equals("CALCULATE"))
-    {
-      AdderResult result = state.get(partition).calculate(user);
-      log.info("New result for {}: {}", user, result);
-      results.addResults(partition, user, result);
-    }
-    else
+    switch(message.getType())
     {
-      state.get(partition).addToSum(user, Integer.parseInt(message));
+      case ADD:
+        addNumber(partition, user, (MessageAddNumber) message);
+        break;
+
+      case CALC:
+        calculateSum(partition, user, (MessageCalculateSum) message);
+        break;
     }
 
     if (throttle.isPresent())
@@ -46,7 +65,7 @@ public class ApplicationRecordHandler implements RecordHandler<String, String>
       }
       catch (InterruptedException e)
       {
-        log.warn("Intrerrupted while throttling: {}", e);
+        log.warn("{} - Intrerrupted while throttling: {}", id, e);
       }
     }
   }