X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=0000000000000000000000000000000000000000;hb=1bf30f5890d9ab0a1c7550fe472dec44f486a473;hp=207578193005d77640d78b9b30ea08f25153d5fc;hpb=25c2044064722af20f64651a32e94fb392710bbc;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java deleted file mode 100644 index 2075781..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ /dev/null @@ -1,84 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.kafka.annotation.KafkaHandler; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; - -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -@KafkaListener( - id = "${spring.kafka.consumer.group-id}", - topics = "${sumup.adder.topic}") -public class ApplicationRecordHandler -{ - private final AdderResults results; - private final Optional throttle; - private final String id; - - private final Map state = new HashMap<>(); - - - @KafkaHandler - public void addNumber( - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) - Integer partition, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) - String user, - @Payload - MessageAddNumber message) - { - log.debug("{} - Received {} for {} on {}", id, message, user, partition); - state.get(partition).addToSum(user, message.getNext()); - throttle(); - } - - @KafkaHandler - public void calculateSum( - @Header(KafkaHeaders.RECEIVED_PARTITION_ID) - Integer partition, - @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) - String user, - @Payload - MessageCalculateSum message) - { - AdderResult result = state.get(partition).calculate(user); - log.info("{} - New result for {}: {}", id, user, result); - results.addResults(partition, user, result); - throttle(); - } - - private void throttle() - { - if (throttle.isPresent()) - { - try - { - Thread.sleep(throttle.get().toMillis()); - } - catch (InterruptedException e) - { - log.warn("{} - Intrerrupted while throttling: {}", id, e); - } - } - } - - protected void addPartition(Integer partition, Map state) - { - this.state.put(partition, new AdderBusinessLogic(state)); - } - - protected Map removePartition(Integer partition) - { - return this.state.remove(partition).getState(); - } -}