Version mit nur einer Handler-Methode
authorKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 10:58:57 +0000 (12:58 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 16:09:23 +0000 (18:09 +0200)
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index f082314..a3b849e 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.support.KafkaHeaders;
 import org.springframework.messaging.handler.annotation.Header;
@@ -14,9 +13,6 @@ import java.util.Map;
 
 @RequiredArgsConstructor
 @Slf4j
-@KafkaListener(
-    id = "${spring.kafka.consumer.group-id}",
-    topics = "${sumup.adder.topic}")
 public class ApplicationRecordHandler
 {
   private final AdderResults results;
@@ -25,26 +21,18 @@ public class ApplicationRecordHandler
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
-  @KafkaHandler
   public void addNumber(
-      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
       Integer partition,
-      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
       String user,
-      @Payload
       MessageAddNumber message)
   {
     log.debug("{} - Received {} for {} on {}", id, message, user, partition);
     state.get(partition).addToSum(user, message.getNext());
   }
 
-  @KafkaHandler
   public void calculateSum(
-      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
       Integer partition,
-      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
       String user,
-      @Payload
       MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
@@ -52,6 +40,28 @@ public class ApplicationRecordHandler
     results.addResults(partition, user, result);
   }
 
+  @KafkaListener(
+      id = "${spring.kafka.consumer.group-id}",
+      topics = "${sumup.adder.topic}")
+  public void accept(
+      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
+      Integer partition,
+      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
+      String user,
+      @Payload
+      Message message)
+  {
+    switch(message.getType())
+    {
+      case ADD:
+        addNumber(partition, user, (MessageAddNumber) message);
+        break;
+
+      case CALC:
+        calculateSum(partition, user, (MessageCalculateSum) message);
+        break;
+    }
+  }
 
   protected void addPartition(Integer partition, Map<String, AdderResult> state)
   {