import org.springframework.util.backoff.FixedBackOff;
import java.util.Map;
-import java.util.Optional;
@SpringBootApplication
@Bean
public ApplicationRecordHandler applicationRecordHandler(
AdderResults adderResults,
- KafkaProperties kafkaProperties,
- ApplicationProperties applicationProperties)
+ KafkaProperties kafkaProperties)
{
return new ApplicationRecordHandler(
adderResults,
- Optional.ofNullable(applicationProperties.getThrottle()),
kafkaProperties.getConsumer().getGroupId());
}
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
-import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
@RequiredArgsConstructor
public class ApplicationRecordHandler
{
private final AdderResults results;
- private final Optional<Duration> throttle;
private final String id;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
{
log.debug("{} - Received {} for {} on {}", id, message, user, partition);
state.get(partition).addToSum(user, message.getNext());
- throttle();
}
@KafkaHandler
AdderResult result = state.get(partition).calculate(user);
log.info("{} - New result for {}: {}", id, user, result);
results.addResults(partition, user, result);
- throttle();
}
- private void throttle()
- {
- if (throttle.isPresent())
- {
- try
- {
- Thread.sleep(throttle.get().toMillis());
- }
- catch (InterruptedException e)
- {
- log.warn("{} - Intrerrupted while throttling: {}", id, e);
- }
- }
- }
protected void addPartition(Integer partition, Map<String, AdderResult> state)
{