Die Implementierung speichert Zustand & Offsets vor _jedem_ `poll()`
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6
7 import java.time.Duration;
8 import java.util.HashMap;
9 import java.util.Map;
10 import java.util.Optional;
11
12
13 @RequiredArgsConstructor
14 @Slf4j
15 public class ApplicationRecordHandler implements RecordHandler<String, String>
16 {
17   private final AdderResults results;
18   private final Optional<Duration> throttle;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22   private final Map<Integer, Long> next = new HashMap<>();
23
24
25   @Override
26   public void accept(ConsumerRecord<String, String> record)
27   {
28     Integer partition = record.partition();
29     String user = record.key();
30     String message = record.value();
31
32     if (record.offset() < next.get(partition))
33     {
34       log.warn(
35         "{}- Dropping duplicate message: offset={} < next={}",
36         id,
37         record.offset(),
38         next.get(partition));
39       return;
40     }
41
42     if (message.equals("CALCULATE"))
43     {
44       AdderResult result = state.get(partition).calculate(user);
45       log.info("{} - New result for {}: {}", id, user, result);
46       results.addResults(partition, user, result);
47     }
48     else
49     {
50       state.get(partition).addToSum(user, Integer.parseInt(message));
51     }
52
53     next.put(partition, record.offset() + 1);
54
55     if (throttle.isPresent())
56     {
57       try
58       {
59         Thread.sleep(throttle.get().toMillis());
60       }
61       catch (InterruptedException e)
62       {
63         log.warn("{} - Intrerrupted while throttling: {}", id, e);
64       }
65     }
66   }
67
68   protected void addPartition(Integer partition, Map<String, AdderResult> state, Long offset)
69   {
70     this.state.put(partition, new AdderBusinessLogic(state));
71     this.next.put(partition, offset);
72   }
73
74   protected ApplicationState removePartition(Integer partition)
75   {
76     ApplicationState state = getState(partition);
77     this.next.remove(partition);
78     this.state.remove(partition);
79     return state;
80   }
81
82
83   public Map<Integer, AdderBusinessLogic> getState()
84   {
85     return state;
86   }
87
88   public ApplicationState getState(Integer partition)
89   {
90     return
91       new ApplicationState(
92         this.next.get(partition),
93         this.state.get(partition).getState());
94   }
95 }