import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
@RequiredArgsConstructor
@Slf4j
-@KafkaListener(
- id = "${spring.kafka.consumer.group-id}",
- topics = "${sumup.adder.topic}")
public class ApplicationRecordHandler
{
private final AdderResults results;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
- @KafkaHandler
public void addNumber(
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
Integer partition,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
String user,
- @Payload
MessageAddNumber message)
{
log.debug("{} - Received {} for {} on {}", id, message, user, partition);
state.get(partition).addToSum(user, message.getNext());
}
- @KafkaHandler
public void calculateSum(
- @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
Integer partition,
- @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
String user,
- @Payload
MessageCalculateSum message)
{
AdderResult result = state.get(partition).calculate(user);
results.addResults(partition, user, result);
}
+ @KafkaListener(
+ id = "${spring.kafka.consumer.group-id}",
+ topics = "${sumup.adder.topic}")
+ public void accept(
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
+ Integer partition,
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
+ String user,
+ @Payload
+ Message message)
+ {
+ switch(message.getType())
+ {
+ case ADD:
+ addNumber(partition, user, (MessageAddNumber) message);
+ break;
+
+ case CALC:
+ calculateSum(partition, user, (MessageCalculateSum) message);
+ break;
+ }
+ }
protected void addPartition(Integer partition, Map<String, AdderResult> state)
{