@KafkaHandler
+ @Override
public void addNumber(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
@Payload MessageAddNumber message)
{
state.get(partition).addToSum(user, message.getNext());
+ throttle();
}
@KafkaHandler
+ @Override
public void calcSum(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user,
+ @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
+ @Header(KafkaHeaders.OFFSET) Long offset,
@Payload MessageCalculateSum message)
{
AdderResult result = state.get(partition).calculate(user);
log.info("{} - New result for {}: {}", id, user, result);
results.addResults(partition, user, result);
+ throttle();
}
- @Override
- public void accept(ConsumerRecord<String, Message> record)
+ private void throttle()
{
- Integer partition = record.partition();
- String user = record.key();
- Message message = record.value();
-
- switch(message.getType())
- {
- case ADD:
- MessageAddNumber addNumber = (MessageAddNumber)message;
- state.get(partition).addToSum(user, addNumber.getNext());
- break;
-
- case CALC:
- AdderResult result = state.get(partition).calculate(user);
- log.info("{} - New result for {}: {}", id, user, result);
- results.addResults(partition, user, result);
- break;
- }
-
if (throttle.isPresent())
{
try