X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=207578193005d77640d78b9b30ea08f25153d5fc;hb=refs%2Fheads%2Fsumup-adder--springified;hp=596f3da256d9d1e9a052ea43fe2d21c674db94cd;hpb=e11c6152c721440d4a599a6f5fe0fe46f2283f31;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 596f3da..2075781 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,37 +2,74 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +@KafkaListener( + id = "${spring.kafka.consumer.group-id}", + topics = "${sumup.adder.topic}") +public class ApplicationRecordHandler { private final AdderResults results; + private final Optional throttle; + private final String id; private final Map state = new HashMap<>(); - @Override - public void accept(ConsumerRecord record) + @KafkaHandler + public void addNumber( + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) + Integer partition, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) + String user, + @Payload + MessageAddNumber message) { - Integer partition = record.partition(); - String user = record.key(); - String message = record.value(); + log.debug("{} - Received {} for {} on {}", id, message, user, partition); + state.get(partition).addToSum(user, message.getNext()); + throttle(); + } + + @KafkaHandler + public void calculateSum( + @Header(KafkaHeaders.RECEIVED_PARTITION_ID) + Integer partition, + @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) + String user, + @Payload + MessageCalculateSum message) + { + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + throttle(); + } - if (message.equals("CALCULATE")) + private void throttle() + { + if (throttle.isPresent()) { - AdderResult result = state.get(partition).calculate(user); - log.info("New result for {}: {}", user, result); - results.addResults(partition, user, result); - return; + try + { + Thread.sleep(throttle.get().toMillis()); + } + catch (InterruptedException e) + { + log.warn("{} - Intrerrupted while throttling: {}", id, e); + } } - - state.get(partition).addToSum(user, Integer.parseInt(message)); } protected void addPartition(Integer partition, Map state) @@ -44,15 +81,4 @@ public class ApplicationRecordHandler implements RecordHandler { return this.state.remove(partition).getState(); } - - - public Map getState() - { - return state; - } - - public AdderBusinessLogic getState(Integer partition) - { - return state.get(partition); - } }