X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=ef38357ede2d8a334d2dfea6942c6e7ffee6c3af;hb=9ac94aed684ce23a186792a3275a574d5adfa836;hp=596f3da256d9d1e9a052ea43fe2d21c674db94cd;hpb=a4be0eac8d9f234cd53ae1917a084f65d91d0460;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 596f3da..ef38357 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -4,8 +4,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.time.Duration; import java.util.HashMap; import java.util.Map; +import java.util.Optional; @RequiredArgsConstructor @@ -13,8 +15,11 @@ import java.util.Map; public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; + private final Optional throttle; + private final String id; private final Map state = new HashMap<>(); + private final Map next = new HashMap<>(); @Override @@ -24,25 +29,58 @@ public class ApplicationRecordHandler implements RecordHandler String user = record.key(); String message = record.value(); + if (record.offset() < next.get(partition)) + { + log.warn( + "{}- Dropping duplicate message: offset={} < next={}", + id, + record.offset(), + next.get(partition)); + return; + } + if (message.equals("CALCULATE")) { AdderResult result = state.get(partition).calculate(user); - log.info("New result for {}: {}", user, result); + log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); - return; + } + else + { + state.get(partition).addToSum(user, Integer.parseInt(message)); } - state.get(partition).addToSum(user, Integer.parseInt(message)); + next.put(partition, record.offset() + 1); + + if (throttle.isPresent()) + { + try + { + Thread.sleep(throttle.get().toMillis()); + } + catch (InterruptedException e) + { + log.warn("{} - Intrerrupted while throttling: {}", id, e); + } + } } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state, Long offset) { this.state.put(partition, new AdderBusinessLogic(state)); + this.next.put(partition, offset); } - protected Map removePartition(Integer partition) + protected ApplicationState removePartition(Integer partition) { - return this.state.remove(partition).getState(); + ApplicationState state = + new ApplicationState( + this.next.get(partition), + this.state.remove(partition).getState()); + + this.next.remove(partition); + + return state; }