X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=f4d3671ae6da4208a551271f3b19750213e07ce8;hb=66ff7d205e66616de8aaca94503dbbcd7d281f6d;hp=0f5b982ccdda3e7d2cc84154ee8bb62754d2a03d;hpb=ad4ed61abeb48124f4db65687ede71f5bc943f27;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 0f5b982..f4d3671 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import java.time.Duration; import java.util.HashMap; @@ -12,32 +11,43 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j -public class ApplicationRecordHandler implements RecordHandler +public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; private final Optional throttle; + private final String id; private final Map state = new HashMap<>(); @Override - public void accept(ConsumerRecord record) + public void addNumber( + String topic, + Integer partition, + Long offset, + String user, + MessageAddNumber message) { - Integer partition = record.partition(); - String user = record.key(); - String message = record.value(); + state.get(partition).addToSum(user, message.getNext()); + throttle(); + } - if (message.equals("CALCULATE")) - { - AdderResult result = state.get(partition).calculate(user); - log.info("New result for {}: {}", user, result); - results.addResults(partition, user, result); - } - else - { - state.get(partition).addToSum(user, Integer.parseInt(message)); - } + @Override + public void calculateSum( + String topic, + Integer partition, + Long offset, + String user, + MessageCalculateSum message) + { + AdderResult result = state.get(partition).calculate(user); + log.info("{} - New result for {}: {}", id, user, result); + results.addResults(partition, user, result); + throttle(); + } + private void throttle() + { if (throttle.isPresent()) { try @@ -46,7 +56,7 @@ public class ApplicationRecordHandler implements RecordHandler } catch (InterruptedException e) { - log.warn("Intrerrupted while throttling: {}", e); + log.warn("{} - Intrerrupted while throttling: {}", id, e); } } }