Benennung vereinheitlicht und projektunabhängig gemacht
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8
9
10 @Slf4j
11 public class ApplicationRecordHandler implements RecordHandler<String, String>
12 {
13   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
14
15
16   @Override
17   public void accept(ConsumerRecord<String, String> record)
18   {
19     Integer partition = record.partition();
20     String user = record.key();
21     String message = record.value();
22     switch (message)
23     {
24       case "START":
25         state.get(partition).startSum(user);
26         break;
27
28       case "END":
29         Long result = state.get(partition).endSum(user);
30         log.info("New result for {}: {}", user, result);
31         break;
32
33       default:
34         state.get(partition).addToSum(user, Integer.parseInt(message));
35         break;
36     }
37   }
38
39   protected void addPartition(Integer partition, Map<String, Long> state)
40   {
41     this.state.put(partition, new AdderBusinessLogic(state));
42   }
43
44   protected Map<String, Long> removePartition(Integer partition)
45   {
46     return this.state.remove(partition).getState();
47   }
48
49
50   public Map<Integer, AdderBusinessLogic> getState()
51   {
52     return state;
53   }
54 }