Fix: Fehler müssen wie bestätigte Nachrichten behandelt werden
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 07:12:15 +0000 (08:12 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:21:16 +0000 (18:21 +0100)
* Ohne explizite Fehlerbehandlung müssen auch die nicht bestätigten
  Nachrichten als `acked` gezählt werden.
* Ansonsten würde die Verarbeitung in einem ``poll()``-Durchlauf mit Fehler
  hängen bleiben, da niemals alles "gesehenen" Nachrichten auch als
  "bestätigt" gezählt würden.
* Dabei: Producer-Code an den aus `producer/spring-producer` angeglichen.

src/main/java/de/juplo/kafka/ExampleConsumer.java

index 7584a17..27a1bba 100644 (file)
@@ -153,24 +153,61 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   void sendCounterState(int partition, String key, Long counter)
   {
     seen[partition]++;
-    ProducerRecord<String, String> record = new ProducerRecord<>(stateTopic, key, counter.toString());
-    producer.send(record, ((metadata, exception) ->
+
+    final long time = System.currentTimeMillis();
+
+    final ProducerRecord<String, String> record = new ProducerRecord<>(
+        stateTopic,        // Topic
+        key,               // Key
+        counter.toString() // Value
+    );
+
+    producer.send(record, (metadata, e) ->
     {
-      if (exception == null)
+      long now = System.currentTimeMillis();
+      if (e == null)
       {
-        acked[partition]++;
-        if (done[partition] && !(acked[partition] < seen[partition]))
-        {
-          phaser.arrive();
-        }
+        // HANDLE SUCCESS
+        log.debug(
+            "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
+            id,
+            record.key(),
+            record.value(),
+            metadata.partition(),
+            metadata.offset(),
+            metadata.timestamp(),
+            now - time
+        );
       }
       else
       {
-        // Errors are ignored (for now):
-        // The next occurrence of the key will issue a new update of the counter state
-        log.error("{} - {}", id, exception.toString());
+        // HANDLE ERROR
+        log.error(
+            "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
+            id,
+            record.key(),
+            record.value(),
+            metadata == null ? -1 : metadata.timestamp(),
+            now - time,
+            e.toString()
+        );
+      }
+
+      acked[partition]++;
+      if (done[partition] && !(acked[partition] < seen[partition]))
+      {
+        phaser.arrive();
       }
-    }));
+    });
+
+    long now = System.currentTimeMillis();
+    log.trace(
+        "{} - Queued message {}={}, latency={}ms",
+        id,
+        record.key(),
+        record.value(),
+        now - time
+    );
   }
 
   @Override