Code an die Version aus 'sumup-adder--springified' angepasst
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, Message>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22
23
24   public void addNumber(
25       Integer partition,
26       String user,
27       MessageAddNumber message)
28   {
29     state.get(partition).addToSum(user, message.getNext());
30   }
31
32   public void calculateSum(
33       Integer partition,
34       String user,
35       MessageCalculateSum message)
36   {
37     AdderResult result = state.get(partition).calculate(user);
38     log.info("{} - New result for {}: {}", id, user, result);
39     results.addResults(partition, user, result);
40   }
41
42   @Override
43   public void accept(ConsumerRecord<String, Message> record)
44   {
45     Integer partition = record.partition();
46     String user = record.key();
47     Message message = record.value();
48
49     switch(message.getType())
50     {
51       case ADD:
52         addNumber(partition, user, (MessageAddNumber) message);
53         break;
54
55       case CALC:
56         calculateSum(partition, user, (MessageCalculateSum) message);
57         break;
58     }
59
60     if (throttle.isPresent())
61     {
62       try
63       {
64         Thread.sleep(throttle.get().toMillis());
65       }
66       catch (InterruptedException e)
67       {
68         log.warn("{} - Intrerrupted while throttling: {}", id, e);
69       }
70     }
71   }
72
73   protected void addPartition(Integer partition, Map<String, AdderResult> state)
74   {
75     this.state.put(partition, new AdderBusinessLogic(state));
76   }
77
78   protected Map<String, AdderResult> removePartition(Integer partition)
79   {
80     return this.state.remove(partition).getState();
81   }
82
83
84   public Map<Integer, AdderBusinessLogic> getState()
85   {
86     return state;
87   }
88
89   public AdderBusinessLogic getState(Integer partition)
90   {
91     return state.get(partition);
92   }
93 }