ROT: Merge der korrigierten Test-Logik deserialization -> into sumup-adder
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.util.HashMap;
8 import java.util.Map;
9
10
11 @RequiredArgsConstructor
12 @Slf4j
13 public class ApplicationRecordHandler implements RecordHandler<String, String>
14 {
15   private final AdderResults results;
16
17   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
18
19
20   @Override
21   public void accept(ConsumerRecord<String, String> record)
22   {
23     Integer partition = record.partition();
24     String user = record.key();
25     String message = record.value();
26
27     if (message.equals("CALCULATE"))
28     {
29       AdderResult result = state.get(partition).calculate(user);
30       log.info("New result for {}: {}", user, result);
31       results.addResults(partition, user, result);
32       return;
33     }
34
35     state.get(partition).addToSum(user, Integer.parseInt(message));
36   }
37
38   protected void addPartition(Integer partition, Map<String, AdderResult> state)
39   {
40     this.state.put(partition, new AdderBusinessLogic(state));
41   }
42
43   protected Map<String, AdderResult> removePartition(Integer partition)
44   {
45     return this.state.remove(partition).getState();
46   }
47
48
49   public Map<Integer, AdderBusinessLogic> getState()
50   {
51     return state;
52   }
53
54   public AdderBusinessLogic getState(Integer partition)
55   {
56     return state.get(partition);
57   }
58 }