X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRecordHandler.java;h=bc18d59b6596507d9ebc03678ceb2b3f487512e4;hb=refs%2Ftags%2Fsumup-adder--drop-duplicates---lvm-2-tage;hp=51d524fc682beb6a8998075a55bf037c7c25ee51;hpb=80228c42df19f61f868de9d72afb655a3e6c0576;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..bc18d59 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -19,6 +19,7 @@ public class ApplicationRecordHandler implements RecordHandler private final String id; private final Map state = new HashMap<>(); + private final Map next = new HashMap<>(); @Override @@ -28,6 +29,16 @@ public class ApplicationRecordHandler implements RecordHandler String user = record.key(); String message = record.value(); + if (record.offset() < next.get(partition)) + { + log.warn( + "{}- Dropping duplicate message: offset={} < next={}", + id, + record.offset(), + next.get(partition)); + return; + } + if (message.equals("CALCULATE")) { AdderResult result = state.get(partition).calculate(user); @@ -39,6 +50,8 @@ public class ApplicationRecordHandler implements RecordHandler state.get(partition).addToSum(user, Integer.parseInt(message)); } + next.put(partition, record.offset() + 1); + if (throttle.isPresent()) { try @@ -52,14 +65,18 @@ public class ApplicationRecordHandler implements RecordHandler } } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state, Long offset) { this.state.put(partition, new AdderBusinessLogic(state)); + this.next.put(partition, offset); } - protected Map removePartition(Integer partition) + protected ApplicationState removePartition(Integer partition) { - return this.state.remove(partition).getState(); + ApplicationState state = getState(partition); + this.next.remove(partition); + this.state.remove(partition); + return state; } @@ -68,8 +85,11 @@ public class ApplicationRecordHandler implements RecordHandler return state; } - public AdderBusinessLogic getState(Integer partition) + public ApplicationState getState(Integer partition) { - return state.get(partition); + return + new ApplicationState( + this.next.get(partition), + this.state.get(partition).getState()); } }