X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=207578193005d77640d78b9b30ea08f25153d5fc;hb=25c2044064722af20f64651a32e94fb392710bbc;hp=f4d3671ae6da4208a551271f3b19750213e07ce8;hpb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index f4d3671..2075781 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,6 +2,11 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; import java.time.Duration; import java.util.HashMap; @@ -11,7 +16,10 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +@KafkaListener( + id = "${spring.kafka.consumer.group-id}", + topics = "${sumup.adder.topic}") +public class ApplicationRecordHandler { private final AdderResults results; private final Optional throttle; @@ -20,24 +28,27 @@ public class ApplicationRecordHandler implements RecordHandler private final Map state = new HashMap<>(); - @Override + @KafkaHandler public void addNumber( - String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, + @Payload MessageAddNumber message) { + log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); throttle(); } - @Override + @KafkaHandler public void calculateSum( - String topic, + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, - Long offset, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, + @Payload MessageCalculateSum message) { AdderResult result = state.get(partition).calculate(user); @@ -70,15 +81,4 @@ public class ApplicationRecordHandler implements RecordHandler { return this.state.remove(partition).getState(); } - - - public Map getState() - { - return state; - } - - public AdderBusinessLogic getState(Integer partition) - { - return state.get(partition); - } }