`EndlessConsumer` auf `@KafkaHandler` umgestellt
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5
6 import java.time.Duration;
7 import java.util.HashMap;
8 import java.util.Map;
9 import java.util.Optional;
10
11
12 @RequiredArgsConstructor
13 @Slf4j
14 public class ApplicationRecordHandler implements RecordHandler
15 {
16   private final AdderResults results;
17   private final Optional<Duration> throttle;
18   private final String id;
19
20   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
21
22
23   @Override
24   public void addNumber(
25       String topic,
26       Integer partition,
27       Long offset,
28       String user,
29       MessageAddNumber message)
30   {
31     state.get(partition).addToSum(user, message.getNext());
32     throttle();
33   }
34
35   @Override
36   public void calculateSum(
37       String topic,
38       Integer partition,
39       Long offset,
40       String user,
41       MessageCalculateSum message)
42   {
43     AdderResult result = state.get(partition).calculate(user);
44     log.info("{} - New result for {}: {}", id, user, result);
45     results.addResults(partition, user, result);
46     throttle();
47   }
48
49   private void throttle()
50   {
51     if (throttle.isPresent())
52     {
53       try
54       {
55         Thread.sleep(throttle.get().toMillis());
56       }
57       catch (InterruptedException e)
58       {
59         log.warn("{} - Intrerrupted while throttling: {}", id, e);
60       }
61     }
62   }
63
64   protected void addPartition(Integer partition, Map<String, AdderResult> state)
65   {
66     this.state.put(partition, new AdderBusinessLogic(state));
67   }
68
69   protected Map<String, AdderResult> removePartition(Integer partition)
70   {
71     return this.state.remove(partition).getState();
72   }
73
74
75   public Map<Integer, AdderBusinessLogic> getState()
76   {
77     return state;
78   }
79
80   public AdderBusinessLogic getState(Integer partition)
81   {
82     return state.get(partition);
83   }
84 }