WIP:kafkahandler
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRecordHandler.java
index 61b265b..1804951 100644 (file)
@@ -34,46 +34,35 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
 
 
   @KafkaHandler
+  @Override
   public void addNumber(
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Header(KafkaHeaders.OFFSET) Long offset,
     @Payload MessageAddNumber message)
   {
     state.get(partition).addToSum(user, message.getNext());
+    throttle();
   }
 
   @KafkaHandler
+  @Override
   public void calcSum(
     @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+    @Header(KafkaHeaders.OFFSET) Long offset,
     @Payload MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
     log.info("{} - New result for {}: {}", id, user, result);
     results.addResults(partition, user, result);
+    throttle();
   }
 
-  @Override
-  public void accept(ConsumerRecord<String, Message> record)
+  private void throttle()
   {
-    Integer partition = record.partition();
-    String user = record.key();
-    Message message = record.value();
-
-    switch(message.getType())
-    {
-      case ADD:
-        MessageAddNumber addNumber = (MessageAddNumber)message;
-        state.get(partition).addToSum(user, addNumber.getNext());
-        break;
-
-      case CALC:
-        AdderResult result = state.get(partition).calculate(user);
-        log.info("{} - New result for {}: {}", id, user, result);
-        results.addResults(partition, user, result);
-        break;
-    }
-
     if (throttle.isPresent())
     {
       try