1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.kafka.annotation.KafkaHandler;
7 import org.springframework.kafka.annotation.KafkaListener;
8 import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
9 import org.springframework.kafka.support.KafkaHeaders;
10 import org.springframework.messaging.handler.annotation.Header;
11 import org.springframework.messaging.handler.annotation.Headers;
12 import org.springframework.messaging.handler.annotation.Payload;
14 import java.time.Duration;
15 import java.util.HashMap;
17 import java.util.Optional;
20 @RequiredArgsConstructor
23 id = "${spring.kafka.client-id}",
25 topics = "${sumup.adder.topic}",
26 autoStartup = "false")
27 public class ApplicationRecordHandler implements RecordHandler<String, Message>
29 private final AdderResults results;
30 private final Optional<Duration> throttle;
31 private final String id;
33 private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
38 public void addNumber(
39 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
40 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
41 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
42 @Header(KafkaHeaders.OFFSET) Long offset,
43 @Payload MessageAddNumber message)
45 state.get(partition).addToSum(user, message.getNext());
52 @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
53 @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
54 @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
55 @Header(KafkaHeaders.OFFSET) Long offset,
56 @Payload MessageCalculateSum message)
58 AdderResult result = state.get(partition).calculate(user);
59 log.info("{} - New result for {}: {}", id, user, result);
60 results.addResults(partition, user, result);
64 private void throttle()
66 if (throttle.isPresent())
70 Thread.sleep(throttle.get().toMillis());
72 catch (InterruptedException e)
74 log.warn("{} - Intrerrupted while throttling: {}", id, e);
79 protected void addPartition(Integer partition, Map<String, AdderResult> state)
81 this.state.put(partition, new AdderBusinessLogic(state));
84 protected Map<String, AdderResult> removePartition(Integer partition)
86 return this.state.remove(partition).getState();
90 public Map<Integer, AdderBusinessLogic> getState()
95 public AdderBusinessLogic getState(Integer partition)
97 return state.get(partition);