From 7083f2ec840ec8819a7f86b5df12da5743ea395f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 26 Aug 2022 10:25:55 +0200 Subject: [PATCH] =?utf8?q?Bereits=20gesehene=20Nachrichten=20werden=20?= =?utf8?q?=C3=BCbersprungen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der `ApplicationRecordHandler` merkt sich für jede Partition den Offset der zuletzt erfolgreich verarbeiteten Nachrichten. * Diese Offsets werden in der Mongo-DB mitgespeichert und daraus wieder hergestellt. * Wenn der Handler eine Nachricht erhält, deren Offset nicht größer als der gemerkte Offset ist, wird diese mit einer Warnung verworfen. * Vorführ-Skript so verschärft, dass deutlich wird, dass der Zustand durch die Verbesserung auch bei außerordentlichen Fehlern korrekt erhalten bleibt. * Conflicts: ** `+src/main/java/de/juplo/kafka/ApplicationRecordHandler.java+` --- .../kafka/ApplicationRebalanceListener.java | 25 ++++++++-- .../juplo/kafka/ApplicationRecordHandler.java | 48 +++++++++++++++++-- .../java/de/juplo/kafka/ApplicationState.java | 15 ++++++ .../java/de/juplo/kafka/StateDocument.java | 5 +- 4 files changed, 84 insertions(+), 9 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/ApplicationState.java diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index ba15227..2874125 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -32,7 +32,12 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe stateRepository .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); + log.info( + "{} - Offset of next unseen message for partition {}: {}", + id, + partition, + document.offset); + recordHandler.addPartition(partition, document.state, document.offset); for (String user : document.state.keySet()) { log.info( @@ -54,18 +59,28 @@ public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListe Integer partition = tp.partition(); log.info("{} - removing partition: {}", id, partition); this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String user : state.keySet()) + ApplicationState state = recordHandler.removePartition(partition); + log.info( + "{} - offset of next unseen message for partition {} is {}", + id, + partition, + state.getOffset()); + for (String user : state.getAdderState().keySet()) { log.info( "{} - Saved state for partition={}|user={}: {}", id, partition, user, - state.get(user)); + state.getAdderState().get(user)); } Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); + stateRepository.save( + new StateDocument( + partition, + state.getAdderState(), + results, + state.getOffset())); }); } } diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 2075781..5fe7591 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -26,19 +26,25 @@ public class ApplicationRecordHandler private final String id; private final Map state = new HashMap<>(); + private final Map next = new HashMap<>(); @KafkaHandler public void addNumber( @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) + Long offset, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, @Payload MessageAddNumber message) { + if (alreadySeen(partition, offset)) + return; log.debug("{} - Received {} for {} on {}", id, message, user, partition); state.get(partition).addToSum(user, message.getNext()); + rememberMessage(partition, offset); throttle(); } @@ -46,17 +52,45 @@ public class ApplicationRecordHandler public void calculateSum( @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition, + @Header(KafkaHeaders.OFFSET) + Long offset, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String user, @Payload MessageCalculateSum message) { + if (alreadySeen(partition, offset)) + return; AdderResult result = state.get(partition).calculate(user); log.info("{} - New result for {}: {}", id, user, result); results.addResults(partition, user, result); + rememberMessage(partition, offset); throttle(); } + private boolean alreadySeen(Integer partition, Long offset) + { + if (offset < next.get(partition)) + { + log.warn( + "{}- Dropping duplicate message: offset={} < next={}", + id, + offset, + next.get(partition)); + + return true; + } + else + { + return false; + } + } + + private void rememberMessage(Integer partition, Long offset) + { + next.put(partition, offset + 1); + } + private void throttle() { if (throttle.isPresent()) @@ -72,13 +106,21 @@ public class ApplicationRecordHandler } } - protected void addPartition(Integer partition, Map state) + protected void addPartition(Integer partition, Map state, Long offset) { this.state.put(partition, new AdderBusinessLogic(state)); + this.next.put(partition, offset); } - protected Map removePartition(Integer partition) + protected ApplicationState removePartition(Integer partition) { - return this.state.remove(partition).getState(); + ApplicationState state = + new ApplicationState( + this.next.get(partition), + this.state.remove(partition).getState()); + + this.next.remove(partition); + + return state; } } diff --git a/src/main/java/de/juplo/kafka/ApplicationState.java b/src/main/java/de/juplo/kafka/ApplicationState.java new file mode 100644 index 0000000..120e43a --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationState.java @@ -0,0 +1,15 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.Map; + + +@RequiredArgsConstructor +@Getter +public class ApplicationState +{ + private final Long offset; + private final Map adderState; +} diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java index ae8eb51..5c4ca22 100644 --- a/src/main/java/de/juplo/kafka/StateDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -15,6 +15,7 @@ public class StateDocument { @Id public String id; + public long offset = 0l; public Map state; public Map> results; @@ -32,10 +33,12 @@ public class StateDocument public StateDocument( Integer partition, Map state, - Map> results) + Map> results, + long offset) { this.id = Integer.toString(partition); this.state = state; this.results = results; + this.offset = offset; } } -- 2.20.1