From: Kai Moritz Date: Mon, 28 Oct 2024 07:12:15 +0000 (+0100) Subject: Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f108bf96a03e08acce5748a29608191bc4551ac2;p=demos%2Fkafka%2Ftraining Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden * Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten Nachrichten als `acked` gezählt werden. * Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als "bestätigt" gezählt würden. * Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen. --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7d8e199..6d72456 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -145,24 +145,61 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener void sendCounterState(int partition, String key, Long counter) { seen[partition]++; - ProducerRecord record = new ProducerRecord<>(stateTopic, key, counter.toString()); - producer.send(record, ((metadata, exception) -> + + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + stateTopic, // Topic + key, // Key + counter.toString() // Value + ); + + producer.send(record, (metadata, e) -> { - if (exception == null) + long now = System.currentTimeMillis(); + if (e == null) { - acked[partition]++; - if (done[partition] && !(acked[partition] < seen[partition])) - { - phaser.arrive(); - } + // HANDLE SUCCESS + log.debug( + "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); } else { - // Errors are ignored (for now): - // The next occurrence of the key will issue a new update of the counter state - log.error("{} - {}", id, exception.toString()); + // HANDLE ERROR + log.error( + "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}", + id, + record.key(), + record.value(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + + acked[partition]++; + if (done[partition] && !(acked[partition] < seen[partition])) + { + phaser.arrive(); } - })); + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message {}={}, latency={}ms", + id, + record.key(), + record.value(), + now - time + ); } @Override