Vorlage
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, Message>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22
23
24   public void addNumber(
25       Integer partition,
26       String user,
27       MessageAddNumber message)
28   {
29     state.get(partition).addToSum(user, message.getNext());
30   }
31
32   public void calculateSum(
33       Integer partition,
34       String user,
35       MessageCalculateSum message)
36   {
37     AdderResult result = state.get(partition).calculate(user);
38     log.info("{} - New result for {}: {}", id, user, result);
39     results.addResults(partition, user, result);
40   }
41
42   @Override
43   public void accept(ConsumerRecord<String, Message> record)
44   {
45     Integer partition = record.partition();
46     String user = record.key();
47     Message message = record.value();
48
49     // TODO: JSON-Nachrichten verarbeiten
50
51     if (throttle.isPresent())
52     {
53       try
54       {
55         Thread.sleep(throttle.get().toMillis());
56       }
57       catch (InterruptedException e)
58       {
59         log.warn("{} - Intrerrupted while throttling: {}", id, e);
60       }
61     }
62   }
63
64   protected void addPartition(Integer partition, Map<String, AdderResult> state)
65   {
66     this.state.put(partition, new AdderBusinessLogic(state));
67   }
68
69   protected Map<String, AdderResult> removePartition(Integer partition)
70   {
71     return this.state.remove(partition).getState();
72   }
73
74
75   public Map<Integer, AdderBusinessLogic> getState()
76   {
77     return state;
78   }
79
80   public AdderBusinessLogic getState(Integer partition)
81   {
82     return state.get(partition);
83   }
84 }