WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SumRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.stream.Collectors;
10
11
12 @Slf4j
13 public class SumRecordHandler implements RecordHandler<String, String>
14 {
15   private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
16
17
18   @Override
19   public void accept(ConsumerRecord<String, String> record)
20   {
21     Integer partition = record.partition();
22     String user = record.key();
23     String message = record.value();
24     switch (message)
25     {
26       case "START":
27         state.get(partition).startSum(user);
28         break;
29
30       case "END":
31         Long result = state.get(partition).endSum(user);
32         log.info("New result for {}: {}", user, result);
33         break;
34
35       default:
36         state.get(partition).addToSum(user, Integer.parseInt(message));
37         break;
38     }
39   }
40
41   protected void addPartition(Integer partition, Map<String, Long> state)
42   {
43     this.state.put(partition, new SumBusinessLogic(state));
44   }
45
46   protected Map<String, Long> removePartition(Integer partition)
47   {
48     return this.state.remove(partition).getState();
49   }
50
51
52   public Map<Integer, SumBusinessLogic> getState()
53   {
54     return state;
55   }
56 }