Konfig-Parameter zum künstlichen Verzögern der Verabeitung eingebaut
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, String>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19
20   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
21
22
23   @Override
24   public void accept(ConsumerRecord<String, String> record)
25   {
26     Integer partition = record.partition();
27     String user = record.key();
28     String message = record.value();
29
30     if (message.equals("CALCULATE"))
31     {
32       AdderResult result = state.get(partition).calculate(user);
33       log.info("New result for {}: {}", user, result);
34       results.addResults(partition, user, result);
35     }
36     else
37     {
38       state.get(partition).addToSum(user, Integer.parseInt(message));
39     }
40
41     if (throttle.isPresent())
42     {
43       try
44       {
45         Thread.sleep(throttle.get().toMillis());
46       }
47       catch (InterruptedException e)
48       {
49         log.warn("Intrerrupted while throttling: {}", e);
50       }
51     }
52   }
53
54   protected void addPartition(Integer partition, Map<String, AdderResult> state)
55   {
56     this.state.put(partition, new AdderBusinessLogic(state));
57   }
58
59   protected Map<String, AdderResult> removePartition(Integer partition)
60   {
61     return this.state.remove(partition).getState();
62   }
63
64
65   public Map<Integer, AdderBusinessLogic> getState()
66   {
67     return state;
68   }
69
70   public AdderBusinessLogic getState(Integer partition)
71   {
72     return state.get(partition);
73   }
74 }