* Conflicts:
** src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
{
return new ApplicationRecordHandler(
adderResults,
{
return new ApplicationRecordHandler(
adderResults,
- Optional.ofNullable(properties.getThrottle()));
+ Optional.ofNullable(properties.getThrottle()),
+ properties.getClientId());
{
private final AdderResults results;
private final Optional<Duration> throttle;
{
private final AdderResults results;
private final Optional<Duration> throttle;
+ private final String id;
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
if (message.equals("CALCULATE"))
{
AdderResult result = state.get(partition).calculate(user);
if (message.equals("CALCULATE"))
{
AdderResult result = state.get(partition).calculate(user);
- log.info("New result for {}: {}", user, result);
+ log.info("{} - New result for {}: {}", id, user, result);
results.addResults(partition, user, result);
}
else
results.addResults(partition, user, result);
}
else
}
catch (InterruptedException e)
{
}
catch (InterruptedException e)
{
- log.warn("Intrerrupted while throttling: {}", e);
+ log.warn("{} - Intrerrupted while throttling: {}", id, e);