e5e40eda4627fdedb15b440fd768532f842b838f
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, String>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22
23
24   @Override
25   public void accept(ConsumerRecord<String, String> record)
26   {
27     Integer partition = record.partition();
28     String user = record.key();
29     String message = record.value();
30
31     if (!state.containsKey(partition))
32       state.put(partition, new AdderBusinessLogic());
33
34     if (message.equals("CALCULATE"))
35     {
36       AdderResult result = state.get(partition).calculate(user);
37       log.info("{} - New result for {}: {}", id, user, result);
38       results.addResults(partition, user, result);
39     }
40     else
41     {
42       state.get(partition).addToSum(user, Integer.parseInt(message));
43     }
44
45     if (throttle.isPresent())
46     {
47       try
48       {
49         Thread.sleep(throttle.get().toMillis());
50       }
51       catch (InterruptedException e)
52       {
53         log.warn("{} - Intrerrupted while throttling: {}", id, e);
54       }
55     }
56   }
57
58   protected void addPartition(Integer partition, Map<String, AdderResult> state)
59   {
60     this.state.put(partition, new AdderBusinessLogic(state));
61   }
62
63   protected Map<String, AdderResult> removePartition(Integer partition)
64   {
65     return this.state.remove(partition).getState();
66   }
67
68
69   public Map<Integer, AdderBusinessLogic> getState()
70   {
71     return state;
72   }
73
74   public AdderBusinessLogic getState(Integer partition)
75   {
76     return state.get(partition);
77   }
78 }