import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.HashMap;
@RequiredArgsConstructor
@Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, Message>
+public class ApplicationRecordHandler implements RecordHandler
{
private final AdderResults results;
private final Optional<Duration> throttle;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
+ @Override
public void addNumber(
+ String topic,
Integer partition,
+ Long offset,
String user,
MessageAddNumber message)
{
state.get(partition).addToSum(user, message.getNext());
+ throttle();
}
+ @Override
public void calculateSum(
+ String topic,
Integer partition,
+ Long offset,
String user,
MessageCalculateSum message)
{
AdderResult result = state.get(partition).calculate(user);
log.info("{} - New result for {}: {}", id, user, result);
results.addResults(partition, user, result);
+ throttle();
}
- @Override
- public void accept(ConsumerRecord<String, Message> record)
+ private void throttle()
{
- Integer partition = record.partition();
- String user = record.key();
- Message message = record.value();
-
- switch(message.getType())
- {
- case ADD:
- addNumber(partition, user, (MessageAddNumber) message);
- break;
-
- case CALC:
- calculateSum(partition, user, (MessageCalculateSum) message);
- break;
- }
-
if (throttle.isPresent())
{
try