1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.kafka.annotation.KafkaHandler;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.support.KafkaHeaders;
8 import org.springframework.messaging.handler.annotation.Header;
9 import org.springframework.messaging.handler.annotation.Payload;
11 import java.time.Duration;
12 import java.util.HashMap;
14 import java.util.Optional;
17 @RequiredArgsConstructor
20 id = "${spring.kafka.consumer.group-id}",
21 topics = "${sumup.adder.topic}")
22 public class ApplicationRecordHandler
24 private final AdderResults results;
25 private final Optional<Duration> throttle;
26 private final String id;
28 private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
32 public void addNumber(
33 @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
35 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
38 MessageAddNumber message)
40 log.debug("{} - Received {} for {} on {}", id, message, user, partition);
41 state.get(partition).addToSum(user, message.getNext());
46 public void calculateSum(
47 @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
49 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
52 MessageCalculateSum message)
54 AdderResult result = state.get(partition).calculate(user);
55 log.info("{} - New result for {}: {}", id, user, result);
56 results.addResults(partition, user, result);
60 private void throttle()
62 if (throttle.isPresent())
66 Thread.sleep(throttle.get().toMillis());
68 catch (InterruptedException e)
70 log.warn("{} - Intrerrupted while throttling: {}", id, e);
75 protected void addPartition(Integer partition, Map<String, AdderResult> state)
77 this.state.put(partition, new AdderBusinessLogic(state));
80 protected Map<String, AdderResult> removePartition(Integer partition)
82 return this.state.remove(partition).getState();