WIP:kafkahandler
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.springframework.kafka.annotation.KafkaHandler;
7 import org.springframework.kafka.annotation.KafkaListener;
8 import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
9 import org.springframework.kafka.support.KafkaHeaders;
10 import org.springframework.messaging.handler.annotation.Header;
11 import org.springframework.messaging.handler.annotation.Headers;
12 import org.springframework.messaging.handler.annotation.Payload;
13
14 import java.time.Duration;
15 import java.util.HashMap;
16 import java.util.Map;
17 import java.util.Optional;
18
19
20 @RequiredArgsConstructor
21 @Slf4j
22 @KafkaListener(
23   id = "${spring.kafka.client-id}",
24   idIsGroup = false,
25   topics = "${sumup.adder.topic}",
26   autoStartup = "false")
27 public class ApplicationRecordHandler implements RecordHandler<String, Message>
28 {
29   private final AdderResults results;
30   private final Optional<Duration> throttle;
31   private final String id;
32
33   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
34
35
36   @KafkaHandler
37   @Override
38   public void addNumber(
39     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
40     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
41     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
42     @Header(KafkaHeaders.OFFSET) Long offset,
43     @Payload MessageAddNumber message)
44   {
45     state.get(partition).addToSum(user, message.getNext());
46     throttle();
47   }
48
49   @KafkaHandler
50   @Override
51   public void calcSum(
52     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
53     @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
54     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
55     @Header(KafkaHeaders.OFFSET) Long offset,
56     @Payload MessageCalculateSum message)
57   {
58     AdderResult result = state.get(partition).calculate(user);
59     log.info("{} - New result for {}: {}", id, user, result);
60     results.addResults(partition, user, result);
61     throttle();
62   }
63
64   private void throttle()
65   {
66     if (throttle.isPresent())
67     {
68       try
69       {
70         Thread.sleep(throttle.get().toMillis());
71       }
72       catch (InterruptedException e)
73       {
74         log.warn("{} - Intrerrupted while throttling: {}", id, e);
75       }
76     }
77   }
78
79   protected void addPartition(Integer partition, Map<String, AdderResult> state)
80   {
81     this.state.put(partition, new AdderBusinessLogic(state));
82   }
83
84   protected Map<String, AdderResult> removePartition(Integer partition)
85   {
86     return this.state.remove(partition).getState();
87   }
88
89
90   public Map<Integer, AdderBusinessLogic> getState()
91   {
92     return state;
93   }
94
95   public AdderBusinessLogic getState(Integer partition)
96   {
97     return state.get(partition);
98   }
99 }