51d524fc682beb6a8998075a55bf037c7c25ee51
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, String>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22
23
24   @Override
25   public void accept(ConsumerRecord<String, String> record)
26   {
27     Integer partition = record.partition();
28     String user = record.key();
29     String message = record.value();
30
31     if (message.equals("CALCULATE"))
32     {
33       AdderResult result = state.get(partition).calculate(user);
34       log.info("{} - New result for {}: {}", id, user, result);
35       results.addResults(partition, user, result);
36     }
37     else
38     {
39       state.get(partition).addToSum(user, Integer.parseInt(message));
40     }
41
42     if (throttle.isPresent())
43     {
44       try
45       {
46         Thread.sleep(throttle.get().toMillis());
47       }
48       catch (InterruptedException e)
49       {
50         log.warn("{} - Intrerrupted while throttling: {}", id, e);
51       }
52     }
53   }
54
55   protected void addPartition(Integer partition, Map<String, AdderResult> state)
56   {
57     this.state.put(partition, new AdderBusinessLogic(state));
58   }
59
60   protected Map<String, AdderResult> removePartition(Integer partition)
61   {
62     return this.state.remove(partition).getState();
63   }
64
65
66   public Map<Integer, AdderBusinessLogic> getState()
67   {
68     return state;
69   }
70
71   public AdderBusinessLogic getState(Integer partition)
72   {
73     return state.get(partition);
74   }
75 }