-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaHandler;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@RequiredArgsConstructor
-@Slf4j
-@KafkaListener(
- id = "${spring.kafka.consumer.group-id}",
- topics = "${sumup.adder.topic}")
-public class ApplicationRecordHandler
-{
- private final AdderResults results;
- private final Optional<Duration> throttle;
- private final String id;
-
- private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
-
-
- @KafkaHandler
- public void addNumber(
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
- Integer partition,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
- String user,
- @Payload
- MessageAddNumber message)
- {
- log.debug("{} - Received {} for {} on {}", id, message, user, partition);
- state.get(partition).addToSum(user, message.getNext());
- throttle();
- }
-
- @KafkaHandler
- public void calculateSum(
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
- Integer partition,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
- String user,
- @Payload
- MessageCalculateSum message)
- {
- AdderResult result = state.get(partition).calculate(user);
- log.info("{} - New result for {}: {}", id, user, result);
- results.addResults(partition, user, result);
- throttle();
- }
-
- private void throttle()
- {
- if (throttle.isPresent())
- {
- try
- {
- Thread.sleep(throttle.get().toMillis());
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Intrerrupted while throttling: {}", id, e);
- }
- }
- }
-
- protected void addPartition(Integer partition, Map<String, AdderResult> state)
- {
- this.state.put(partition, new AdderBusinessLogic(state));
- }
-
- protected Map<String, AdderResult> removePartition(Integer partition)
- {
- return this.state.remove(partition).getState();
- }
-}