From 341383683129f3a0d7e41f2ffa21e3761e2e14df Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 16 Sep 2022 12:58:57 +0200 Subject: [PATCH] Version mit nur einer Handler-Methode --- .../juplo/kafka/ApplicationRecordHandler.java | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index f082314..a3b849e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.annotation.KafkaHandler; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; @@ -14,9 +13,6 @@ import java.util.Map; @RequiredArgsConstructor @Slf4j -@KafkaListener( - id = "${spring.kafka.consumer.group-id}", - topics = "${sumup.adder.topic}") public class ApplicationRecordHandler { private final AdderResults results; @@ -25,26 +21,18 @@ public class ApplicationRecordHandler private final Map state = new HashMap<>(); - @KafkaHandler public void addNumber( - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, - @Payload MessageAddNumber message) { log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); } - @KafkaHandler public void calculateSum( - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, - @Payload MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); @@ -52,6 +40,28 @@ public class ApplicationRecordHandler results.addResults(partition, user, result); } + @KafkaListener( + id = "${spring.kafka.consumer.group-id}", + topics = "${sumup.adder.topic}") + public void accept( + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) + Integer partition, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) + String user, + @Payload + Message message) + { + switch(message.getType()) + { + case ADD: + addNumber(partition, user, (MessageAddNumber) message); + break; + + case CALC: + calculateSum(partition, user, (MessageCalculateSum) message); + break; + } + } protected void addPartition(Integer partition, Map state) { -- 2.20.1