`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 2829157..f4d3671 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -12,7 +11,7 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, Message>
+public class ApplicationRecordHandler implements RecordHandler
 {
   private final AdderResults results;
   private final Optional<Duration> throttle;
@@ -21,42 +20,34 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
+  @Override
   public void addNumber(
+      String topic,
       Integer partition,
+      Long offset,
       String user,
       MessageAddNumber message)
   {
     state.get(partition).addToSum(user, message.getNext());
+    throttle();
   }
 
+  @Override
   public void calculateSum(
+      String topic,
       Integer partition,
+      Long offset,
       String user,
       MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
     log.info("{} - New result for {}: {}", id, user, result);
     results.addResults(partition, user, result);
+    throttle();
   }
 
-  @Override
-  public void accept(ConsumerRecord<String, Message> record)
+  private void throttle()
   {
-    Integer partition = record.partition();
-    String user = record.key();
-    Message message = record.value();
-
-    switch(message.getType())
-    {
-      case ADD:
-        addNumber(partition, user, (MessageAddNumber) message);
-        break;
-
-      case CALC:
-        calculateSum(partition, user, (MessageCalculateSum) message);
-        break;
-    }
-
     if (throttle.isPresent())
     {
       try