GRÜN: Implementierung der Erwartungen inkl. Anpassungen an der Anwendung
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5
6 import java.util.HashMap;
7 import java.util.Map;
8
9
10 @Slf4j
11 public class ApplicationRecordHandler implements RecordHandler<String, String>
12 {
13   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
14
15
16   @Override
17   public void accept(ConsumerRecord<String, String> record)
18   {
19     Integer partition = record.partition();
20     String user = record.key();
21     String message = record.value();
22
23     if (message.equals("CALCULATE"))
24     {
25       AdderResult result = state.get(partition).calculate(user);
26       log.info("New result for {}: {}", user, result);
27       return;
28     }
29
30     state.get(partition).addToSum(user, Integer.parseInt(message));
31   }
32
33   protected void addPartition(Integer partition, Map<String, AdderResult> state)
34   {
35     this.state.put(partition, new AdderBusinessLogic(state));
36   }
37
38   protected Map<String, AdderResult> removePartition(Integer partition)
39   {
40     return this.state.remove(partition).getState();
41   }
42
43
44   public Map<Integer, AdderBusinessLogic> getState()
45   {
46     return state;
47   }
48 }