import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
import java.time.Duration;
import java.util.HashMap;
@RequiredArgsConstructor
@Slf4j
-public class ApplicationRecordHandler implements RecordHandler
+@KafkaListener(
+ id = "${spring.kafka.consumer.group-id}",
+ topics = "${sumup.adder.topic}")
+public class ApplicationRecordHandler
{
private final AdderResults results;
private final Optional<Duration> throttle;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
- @Override
+ @KafkaHandler
public void addNumber(
- String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
Integer partition,
- Long offset,
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
String user,
+ @Payload
MessageAddNumber message)
{
+ log.debug("{} - Received {} for {} on {}", id, message, user, partition);
state.get(partition).addToSum(user, message.getNext());
throttle();
}
- @Override
+ @KafkaHandler
public void calculateSum(
- String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
Integer partition,
- Long offset,
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
String user,
+ @Payload
MessageCalculateSum message)
{
AdderResult result = state.get(partition).calculate(user);
{
return this.state.remove(partition).getState();
}
-
-
- public Map<Integer, AdderBusinessLogic> getState()
- {
- return state;
- }
-
- public AdderBusinessLogic getState(Integer partition)
- {
- return state.get(partition);
- }
}