Vereinfachte Version der auf Spring Kafka basierenden Implementierung
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index f4d3671..2075781 100644 (file)
@@ -2,6 +2,11 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -11,7 +16,10 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler
+@KafkaListener(
+    id = "${spring.kafka.consumer.group-id}",
+    topics = "${sumup.adder.topic}")
+public class ApplicationRecordHandler
 {
   private final AdderResults results;
   private final Optional<Duration> throttle;
@@ -20,24 +28,27 @@ public class ApplicationRecordHandler implements RecordHandler
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
-  @Override
+  @KafkaHandler
   public void addNumber(
-      String topic,
+      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
       Integer partition,
-      Long offset,
+      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
       String user,
+      @Payload
       MessageAddNumber message)
   {
+    log.debug("{} - Received {} for {} on {}", id, message, user, partition);
     state.get(partition).addToSum(user, message.getNext());
     throttle();
   }
 
-  @Override
+  @KafkaHandler
   public void calculateSum(
-      String topic,
+      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
       Integer partition,
-      Long offset,
+      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
       String user,
+      @Payload
       MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
@@ -70,15 +81,4 @@ public class ApplicationRecordHandler implements RecordHandler
   {
     return this.state.remove(partition).getState();
   }
-
-
-  public Map<Integer, AdderBusinessLogic> getState()
-  {
-    return state;
-  }
-
-  public AdderBusinessLogic getState(Integer partition)
-  {
-    return state.get(partition);
-  }
 }