1 package de.juplo.kafka;
3 import lombok.RequiredArgsConstructor;
4 import lombok.extern.slf4j.Slf4j;
5 import org.springframework.kafka.annotation.KafkaListener;
6 import org.springframework.kafka.support.KafkaHeaders;
7 import org.springframework.messaging.handler.annotation.Header;
8 import org.springframework.messaging.handler.annotation.Payload;
10 import java.util.HashMap;
14 @RequiredArgsConstructor
16 public class ApplicationRecordHandler
18 private final AdderResults results;
19 private final String id;
21 private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
24 public void addNumber(
27 MessageAddNumber message)
29 log.debug("{} - Received {} for {} on {}", id, message, user, partition);
30 state.get(partition).addToSum(user, message.getNext());
33 public void calculateSum(
36 MessageCalculateSum message)
38 AdderResult result = state.get(partition).calculate(user);
39 log.info("{} - New result for {}: {}", id, user, result);
40 results.addResults(partition, user, result);
48 switch(message.getType())
51 addNumber(partition, user, (MessageAddNumber) message);
55 calculateSum(partition, user, (MessageCalculateSum) message);
60 protected void addPartition(Integer partition, Map<String, AdderResult> state)
62 this.state.put(partition, new AdderBusinessLogic(state));
65 protected Map<String, AdderResult> removePartition(Integer partition)
67 return this.state.remove(partition).getState();