Vorlage
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
1 package de.juplo.kafka;
2
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.kafka.annotation.KafkaListener;
6 import org.springframework.kafka.support.KafkaHeaders;
7 import org.springframework.messaging.handler.annotation.Header;
8 import org.springframework.messaging.handler.annotation.Payload;
9
10 import java.util.HashMap;
11 import java.util.Map;
12
13
14 @RequiredArgsConstructor
15 @Slf4j
16 public class ApplicationRecordHandler
17 {
18   private final AdderResults results;
19   private final String id;
20
21   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
22
23
24   public void addNumber(
25       Integer partition,
26       String user,
27       MessageAddNumber message)
28   {
29     log.debug("{} - Received {} for {} on {}", id, message, user, partition);
30     state.get(partition).addToSum(user, message.getNext());
31   }
32
33   public void calculateSum(
34       Integer partition,
35       String user,
36       MessageCalculateSum message)
37   {
38     AdderResult result = state.get(partition).calculate(user);
39     log.info("{} - New result for {}: {}", id, user, result);
40     results.addResults(partition, user, result);
41   }
42
43   public void accept(
44       Integer partition,
45       String user,
46       Message message)
47   {
48     switch(message.getType())
49     {
50       case ADD:
51         addNumber(partition, user, (MessageAddNumber) message);
52         break;
53
54       case CALC:
55         calculateSum(partition, user, (MessageCalculateSum) message);
56         break;
57     }
58   }
59
60   protected void addPartition(Integer partition, Map<String, AdderResult> state)
61   {
62     this.state.put(partition, new AdderBusinessLogic(state));
63   }
64
65   protected Map<String, AdderResult> removePartition(Integer partition)
66   {
67     return this.state.remove(partition).getState();
68   }
69 }