d4ec38faf0d0c733076bff6db4a3d597099e883a
[demos/kafka/training] / src / main / java / de / juplo / kafka / SumRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.List;
8 import java.util.Map;
9
10 @Slf4j
11 public class SumRecordHandler implements RecordHandler<String, String>
12 {
13   private final Map<Integer, Map<String, List<Long>>> seen = new HashMap<>();
14   private final Map<Integer, SumBusinessLogic> state = new HashMap<>();
15
16
17   @Override
18   public void accept(ConsumerRecord<String, String> record)
19   {
20     Integer partition = record.partition();
21     String user = record.key();
22     String message = record.value();
23     switch (message)
24     {
25       case "START":
26         state.get(partition).startSum(user);
27         return;
28
29       case "END":
30         Long result = state.get(partition).endSum(user);
31         log.info("New result for {}: {}", user, result);
32         return;
33
34       default:
35         state.get(partition).addToSum(user, Integer.parseInt(message));
36         return;
37     }
38   }
39
40   protected void addPartition(Integer partition, StateDocument document)
41   {
42     this.seen.put(partition, document.seen);
43     this.state.put(partition, new SumBusinessLogic(document.state));
44   }
45
46   protected StateDocument removePartition(Integer partition)
47   {
48     return new StateDocument(
49         partition,
50         this.state.remove(partition).getState(),
51         this.seen.remove(partition));
52   }
53
54
55   public Map<Integer, Map<String, List<Long>>> getSeen()
56   {
57     return seen;
58   }
59 }