From 7d2d6a2c4eaa41dae75f6cdb7b52b2793e0ee4f3 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sat, 11 Jan 2025 00:21:06 +0100
Subject: [PATCH] Logging-Meldung des `BackOffState` vereinfacht und verbessert

---
 .../java/de/juplo/kafka/BackOffState.java     | 38 +++++++++++--------
 .../java/de/juplo/kafka/ExampleConsumer.java  | 19 +++-------
 2 files changed, 28 insertions(+), 29 deletions(-)

diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java
index 37bef374..c6b3e8c2 100644
--- a/src/main/java/de/juplo/kafka/BackOffState.java
+++ b/src/main/java/de/juplo/kafka/BackOffState.java
@@ -44,7 +44,11 @@ class BackOffState
     this.startTime = clock.instant();
     this.timeNextRetryIsDue = this.startTime;
 
-    log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+    log.info(
+      "{} - Back-Off requested for offset={} in {}",
+      id,
+      offset,
+      topicPartition);
     backOffExecution = backOffStrategy.start();
     initializeNextBackOff();
   }
@@ -60,14 +64,24 @@ class BackOffState
     Duration remaining = Duration.between(now, timeNextRetryIsDue);
     if (remaining.isNegative())
     {
-      numRetries++;
-      log.info("{} - {}. retry for {}, lateness: {}", id, numRetries, topicPartition, remaining.abs());
+      log.info(
+        "{} - {}. retry for offset={} in {}, lateness: {}",
+        id,
+        numRetries,
+        offset,
+        topicPartition,
+        remaining.abs());
       initializeNextBackOff();
       return false;
     }
     else
     {
-      log.info("{} - Next retry for {} is due in {}", id, topicPartition, remaining);
+      log.info(
+        "{} - Next retry for offset={} in {} is due in {}",
+        id,
+        offset,
+        topicPartition,
+        remaining);
       return true;
     }
   }
@@ -82,26 +96,17 @@ class BackOffState
     return backOffExecution == null && timeNextRetryIsDue != null;
   }
 
-  int getNumRetries()
-  {
-    return numRetries;
-  }
-
-  Duration getDurationSpendRetrying()
-  {
-    return Duration.between(startTime, timeNextRetryIsDue);
-  }
-
   void markAsCompleted()
   {
     if (backOffExecution != null)
     {
       log.info(
-        "{} - retry #{} for {} succeeded after {}",
+        "{} - {}. retry for offset={} in {} succeeded after {}",
         id,
         numRetries,
+        offset,
         topicPartition,
-        Duration.between(startTime, clock.instant()));
+        Duration.between(startTime, timeNextRetryIsDue));
 
       backOffExecution = null;
       timeNextRetryIsDue = null;
@@ -118,6 +123,7 @@ class BackOffState
     }
     else
     {
+      numRetries++;
       timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
     }
   }
diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java
index 6465a582..b48baaa4 100644
--- a/src/main/java/de/juplo/kafka/ExampleConsumer.java
+++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java
@@ -148,11 +148,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 int partition = topicPartition.partition();
                 if (!backOffState[partition].isStarted(record.offset()))
                 {
-                  log.info(
-                    "{} - First occurrence of a retryable error for offset={} in partition {} - Initializing retry!",
-                    id,
-                    record.offset(),
-                    partition);
+                  log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
                   backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
                   consumer.seek(topicPartition, record.offset());
                   break;
@@ -161,19 +157,16 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 {
                   if (backOffState[partition].isCompleted())
                   {
-                    log.warn(
-                      "{} - Ignoring retryable error after {} attempts and {}",
-                      id,
-                      backOffState[partition].getNumRetries(),
-                      backOffState[partition].getDurationSpendRetrying());
+                    log.warn("{} - Ignoring retryable error: {}", id, e.toString());
                   }
                   else
                   {
                     log.info(
-                      "{} - Retry in progress for offset={} in partition {}",
+                      "{} - Retry in progress for offset={} in {}, error: {}",
                       id,
                       record.offset(),
-                      partition);
+                      partition,
+                      e.toString());
                     consumer.seek(topicPartition, record.offset());
                     break;
                   }
@@ -182,7 +175,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
               catch (NonRetriableErrorException e)
               {
                 // Just ignore, to skip
-                log.warn("{} - Ignoring non-retryable error!", id, e);
+                log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
               }
 
               backOffState[topicPartition.partition()].markAsCompleted();
-- 
2.20.1