+ recordHandler.addNumber(topic, partition, offset, key, message);
+
+ consumed++;
+ }
+
+ @KafkaHandler
+ public void calculateSum(
+ @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
+ @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
+ @Payload MessageCalculateSum message)
+ {
+ log.info(
+ "{} - {}: {}/{} - {}={}",
+ id,
+ offset,
+ topic,
+ partition,
+ key,
+ message
+ );
+
+ recordHandler.calculateSum(topic, partition, offset, key, message);