Vereinfachte Version der auf Spring Kafka basierenden Implementierung
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.kafka.annotation.KafkaHandler;
6 import org.springframework.kafka.annotation.KafkaListener;
7 import org.springframework.kafka.support.KafkaHeaders;
8 import org.springframework.messaging.handler.annotation.Header;
9 import org.springframework.messaging.handler.annotation.Payload;
10
11 import java.time.Duration;
12 import java.util.HashMap;
13 import java.util.Map;
14 import java.util.Optional;
15
16
17 @RequiredArgsConstructor
18 @Slf4j
19 @KafkaListener(
20     id = "${spring.kafka.consumer.group-id}",
21     topics = "${sumup.adder.topic}")
22 public class ApplicationRecordHandler
23 {
24   private final AdderResults results;
25   private final Optional<Duration> throttle;
26   private final String id;
27
28   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
29
30
31   @KafkaHandler
32   public void addNumber(
33       @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
34       Integer partition,
35       @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
36       String user,
37       @Payload
38       MessageAddNumber message)
39   {
40     log.debug("{} - Received {} for {} on {}", id, message, user, partition);
41     state.get(partition).addToSum(user, message.getNext());
42     throttle();
43   }
44
45   @KafkaHandler
46   public void calculateSum(
47       @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
48       Integer partition,
49       @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
50       String user,
51       @Payload
52       MessageCalculateSum message)
53   {
54     AdderResult result = state.get(partition).calculate(user);
55     log.info("{} - New result for {}: {}", id, user, result);
56     results.addResults(partition, user, result);
57     throttle();
58   }
59
60   private void throttle()
61   {
62     if (throttle.isPresent())
63     {
64       try
65       {
66         Thread.sleep(throttle.get().toMillis());
67       }
68       catch (InterruptedException e)
69       {
70         log.warn("{} - Intrerrupted while throttling: {}", id, e);
71       }
72     }
73   }
74
75   protected void addPartition(Integer partition, Map<String, AdderResult> state)
76   {
77     this.state.put(partition, new AdderBusinessLogic(state));
78   }
79
80   protected Map<String, AdderResult> removePartition(Integer partition)
81   {
82     return this.state.remove(partition).getState();
83   }
84 }