WIP:kafkahandler
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 09:13:03 +0000 (11:13 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 09:13:03 +0000 (11:13 +0200)
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java

index 521414f..61b265b 100644 (file)
@@ -3,7 +3,13 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaHandler;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Headers;
+import org.springframework.messaging.handler.annotation.Payload;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -17,7 +23,6 @@ import java.util.Optional;
   id = "${spring.kafka.client-id}",
   idIsGroup = false,
   topics = "${sumup.adder.topic}",
-  batch = "true",
   autoStartup = "false")
 public class ApplicationRecordHandler implements RecordHandler<String, Message>
 {
@@ -28,6 +33,26 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
+  @KafkaHandler
+  public void addNumber(
+    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Payload MessageAddNumber message)
+  {
+    state.get(partition).addToSum(user, message.getNext());
+  }
+
+  @KafkaHandler
+  public void calcSum(
+    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Payload MessageCalculateSum message)
+  {
+    AdderResult result = state.get(partition).calculate(user);
+    log.info("{} - New result for {}: {}", id, user, result);
+    results.addResults(partition, user, result);
+  }
+
   @Override
   public void accept(ConsumerRecord<String, Message> record)
   {