Der Adder verarbeitet zwei Typen von JSON-Nachrichten anstatt String
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, Message>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22
23
24   @Override
25   public void accept(ConsumerRecord<String, Message> record)
26   {
27     Integer partition = record.partition();
28     String user = record.key();
29     Message message = record.value();
30
31     switch(message.getType())
32     {
33       case ADD:
34         MessageAddNumber addNumber = (MessageAddNumber)message;
35         state.get(partition).addToSum(user, addNumber.getNext());
36         break;
37
38       case CALC:
39         AdderResult result = state.get(partition).calculate(user);
40         log.info("{} - New result for {}: {}", id, user, result);
41         results.addResults(partition, user, result);
42         break;
43     }
44
45     if (throttle.isPresent())
46     {
47       try
48       {
49         Thread.sleep(throttle.get().toMillis());
50       }
51       catch (InterruptedException e)
52       {
53         log.warn("{} - Intrerrupted while throttling: {}", id, e);
54       }
55     }
56   }
57
58   protected void addPartition(Integer partition, Map<String, AdderResult> state)
59   {
60     this.state.put(partition, new AdderBusinessLogic(state));
61   }
62
63   protected Map<String, AdderResult> removePartition(Integer partition)
64   {
65     return this.state.remove(partition).getState();
66   }
67
68
69   public Map<Integer, AdderBusinessLogic> getState()
70   {
71     return state;
72   }
73
74   public AdderBusinessLogic getState(Integer partition)
75   {
76     return state.get(partition);
77   }
78 }