From 06cc18bec6b9223023291e5c10dd95c98324239e Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 28 Oct 2024 08:12:15 +0100 Subject: [PATCH] =?utf8?q?Fix:=20Fehler=20m=C3=BCssen=20wie=20best=C3=A4ti?= =?utf8?q?gte=20Nachrichten=20behandelt=20werden?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten Nachrichten als `acked` gezählt werden. * Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als "bestätigt" gezählt würden. * Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen. --- .../java/de/juplo/kafka/ExampleConsumer.java | 61 +++++++++++++++---- 1 file changed, 49 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7584a17..27a1bba 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -153,24 +153,61 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener void sendCounterState(int partition, String key, Long counter) { seen[partition]++; - ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter.toString()); - producer.send(record, ((metadata, exception) -> + + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + stateTopic, // Topic + key, // Key + counter.toString() // Value + ); + + producer.send(record, (metadata, e) -> { - if (exception == null) + long now = System.currentTimeMillis(); + if (e == null) { - acked[partition]++; - if (done[partition] && !(acked[partition] < seen[partition])) - { - phaser.arrive(); - } + // HANDLE SUCCESS + log.debug( + "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); } else { - // Errors are ignored (for now): - // The next occurrence of the key will issue a new update of the counter state - log.error("{} - {}", id, exception.toString()); + // HANDLE ERROR + log.error( + "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", + id, + record.key(), + record.value(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + + acked[partition]++; + if (done[partition] && !(acked[partition] < seen[partition])) + { + phaser.arrive(); } - })); + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message {}={}, latency={}ms", + id, + record.key(), + record.value(), + now - time + ); } @Override -- 2.20.1