1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import java.time.Duration;
8 import java.util.HashMap;
10 import java.util.Optional;
13 @RequiredArgsConstructor
15 public class ApplicationRecordHandler implements RecordHandler<String, String>
17 private final AdderResults results;
18 private final Optional<Duration> throttle;
19 private final String id;
21 private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22 private final Map<Integer, Long> next = new HashMap<>();
26 public void accept(ConsumerRecord<String, String> record)
28 Integer partition = record.partition();
29 String user = record.key();
30 String message = record.value();
32 if (record.offset() < next.get(partition))
35 "{}- Dropping duplicate message: offset={} < next={}",
42 if (message.equals("CALCULATE"))
44 AdderResult result = state.get(partition).calculate(user);
45 log.info("{} - New result for {}: {}", id, user, result);
46 results.addResults(partition, user, result);
50 state.get(partition).addToSum(user, Integer.parseInt(message));
53 next.put(partition, record.offset() + 1);
55 if (throttle.isPresent())
59 Thread.sleep(throttle.get().toMillis());
61 catch (InterruptedException e)
63 log.warn("{} - Intrerrupted while throttling: {}", id, e);
68 protected void addPartition(Integer partition, Map<String, AdderResult> state, Long offset)
70 this.state.put(partition, new AdderBusinessLogic(state));
71 this.next.put(partition, offset);
74 protected ApplicationState removePartition(Integer partition)
76 ApplicationState state = getState(partition);
77 this.next.remove(partition);
78 this.state.remove(partition);
83 public Map<Integer, AdderBusinessLogic> getState()
88 public ApplicationState getState(Integer partition)
92 this.next.get(partition),
93 this.state.get(partition).getState());