From: Kai Moritz Date: Fri, 26 Aug 2022 08:25:55 +0000 (+0200) Subject: Bereits gesehene Nachrichten werden übersprungen X-Git-Tag: sumup-adder--drop-duplicates---lvm-2-tage~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9ac94aed684ce23a186792a3275a574d5adfa836;p=demos%2Fkafka%2Ftraining Bereits gesehene Nachrichten werden übersprungen * Der `ApplicationRecordHandler` merkt sich für jede Partition den Offset der zuletzt erfolgreich verarbeiteten Nachrichten. * Diese Offsets werden in der Mongo-DB mitgespeichert und daraus wieder hergestellt. * Wenn der Handler eine Nachricht erhält, deren Offset nicht größer als der gemerkte Offset ist, wird diese mit einer Warnung verworfen. * Vorführ-Skript so verschärft, dass deutlich wird, dass der Zustand durch die Verbesserung auch bei außerordentlichen Fehlern korrekt erhalten bleibt. --- diff --git a/README.sh b/README.sh index f337d5c..3292f5f 100755 --- a/README.sh +++ b/README.sh @@ -76,7 +76,7 @@ http :8092/results/klaus | jq .[].sum | uniq docker-compose stop adder-1 until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done echo "Resultate für adder-2" http -v --pretty none -S :8092/results @@ -87,4 +87,25 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq +docker-compose kill -s 9 adder-2 +docker-compose start adder-1 docker-compose kill -s 9 peter klaus +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done +until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done +until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done + +echo "Resultate für adder-1" +http -v --pretty none -S :8091/results +echo + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq + +sleep 5 + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0bfee67..5e1a12c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -31,7 +31,12 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); + log.info( + "{} - Offset of next unseen message for partition {}: {}", + id, + partition, + document.offset); + recordHandler.addPartition(partition, document.state, document.offset); for (String user : document.state.keySet()) { log.info( @@ -53,18 +58,28 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener Integer partition = tp.partition(); log.info("{} - removing partition: {}", id, partition); this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String user : state.keySet()) + ApplicationState state = recordHandler.removePartition(partition); + log.info( + "{} - offset of next unseen message for partition {} is {}", + id, + partition, + state.getOffset()); + for (String user : state.getAdderState().keySet()) { log.info( "{} - Saved state for partition={}|user={}: {}", id, partition, user, - state.get(user)); + state.getAdderState().get(user)); } Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); + stateRepository.save( + new StateDocument( + partition, + state.getAdderState(), + results, + state.getOffset())); }); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 51d524f..ef38357 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -19,6 +19,7 @@ public class ApplicationRecordHandler implements RecordHandler private final String id; private final Map state = new HashMap<>(); + private final Map next = new HashMap<>(); @Override @@ -28,6 +29,16 @@ public class ApplicationRecordHandler implements RecordHandler String user = record.key(); String message = record.value(); + if (record.offset() < next.get(partition)) + { + log.warn( + "{}- Dropping duplicate message: offset={} < next={}", + id, + record.offset(), + next.get(partition)); + return; + } + if (message.equals("CALCULATE")) { AdderResult result = state.get(partition).calculate(user); @@ -39,6 +50,8 @@ public class ApplicationRecordHandler implements RecordHandler state.get(partition).addToSum(user, Integer.parseInt(message)); } + next.put(partition, record.offset() + 1); + if (throttle.isPresent()) { try @@ -52,14 +65,22 @@ public class ApplicationRecordHandler implements RecordHandler } } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state, Long offset) { this.state.put(partition, new AdderBusinessLogic(state)); + this.next.put(partition, offset); } - protected Map removePartition(Integer partition) + protected ApplicationState removePartition(Integer partition) { - return this.state.remove(partition).getState(); + ApplicationState state = + new ApplicationState( + this.next.get(partition), + this.state.remove(partition).getState()); + + this.next.remove(partition); + + return state; } diff --git a/src/main/java/de/juplo/kafka/ApplicationState.java b/src/main/java/de/juplo/kafka/ApplicationState.java new file mode 100644 index 0000000..120e43a --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationState.java @@ -0,0 +1,15 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Map; + + +@RequiredArgsConstructor +@Getter +public class ApplicationState +{ + private final Long offset; + private final Map adderState; +} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index ae8eb51..5c4ca22 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -15,6 +15,7 @@ public class StateDocument { @Id public String id; + public long offset = 0l; public Map state; public Map> results; @@ -32,10 +33,12 @@ public class StateDocument public StateDocument( Integer partition, Map state, - Map> results) + Map> results, + long offset) { this.id = Integer.toString(partition); this.state = state; this.results = results; + this.offset = offset; } }