Logging-Meldung des `BackOffState` vereinfacht und verbessert
authorKai Moritz <kai@juplo.de>
Fri, 10 Jan 2025 23:21:06 +0000 (00:21 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 6 Feb 2025 17:04:39 +0000 (18:04 +0100)
src/main/java/de/juplo/kafka/BackOffState.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 37bef37..c6b3e8c 100644 (file)
@@ -44,7 +44,11 @@ class BackOffState
     this.startTime = clock.instant();
     this.timeNextRetryIsDue = this.startTime;
 
-    log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime);
+    log.info(
+      "{} - Back-Off requested for offset={} in {}",
+      id,
+      offset,
+      topicPartition);
     backOffExecution = backOffStrategy.start();
     initializeNextBackOff();
   }
@@ -60,14 +64,24 @@ class BackOffState
     Duration remaining = Duration.between(now, timeNextRetryIsDue);
     if (remaining.isNegative())
     {
-      numRetries++;
-      log.info("{} - {}. retry for {}, lateness: {}", id, numRetries, topicPartition, remaining.abs());
+      log.info(
+        "{} - {}. retry for offset={} in {}, lateness: {}",
+        id,
+        numRetries,
+        offset,
+        topicPartition,
+        remaining.abs());
       initializeNextBackOff();
       return false;
     }
     else
     {
-      log.info("{} - Next retry for {} is due in {}", id, topicPartition, remaining);
+      log.info(
+        "{} - Next retry for offset={} in {} is due in {}",
+        id,
+        offset,
+        topicPartition,
+        remaining);
       return true;
     }
   }
@@ -82,26 +96,17 @@ class BackOffState
     return backOffExecution == null && timeNextRetryIsDue != null;
   }
 
-  int getNumRetries()
-  {
-    return numRetries;
-  }
-
-  Duration getDurationSpendRetrying()
-  {
-    return Duration.between(startTime, timeNextRetryIsDue);
-  }
-
   void markAsCompleted()
   {
     if (backOffExecution != null)
     {
       log.info(
-        "{} - retry #{} for {} succeeded after {}",
+        "{} - {}. retry for offset={} in {} succeeded after {}",
         id,
         numRetries,
+        offset,
         topicPartition,
-        Duration.between(startTime, clock.instant()));
+        Duration.between(startTime, timeNextRetryIsDue));
 
       backOffExecution = null;
       timeNextRetryIsDue = null;
@@ -118,6 +123,7 @@ class BackOffState
     }
     else
     {
+      numRetries++;
       timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis);
     }
   }
index 6465a58..b48baaa 100644 (file)
@@ -148,11 +148,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 int partition = topicPartition.partition();
                 if (!backOffState[partition].isStarted(record.offset()))
                 {
-                  log.info(
-                    "{} - First occurrence of a retryable error for offset={} in partition {} - Initializing retry!",
-                    id,
-                    record.offset(),
-                    partition);
+                  log.info("{} - First occurrence of a retryable error: {}", id, e.toString());
                   backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset());
                   consumer.seek(topicPartition, record.offset());
                   break;
@@ -161,19 +157,16 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
                 {
                   if (backOffState[partition].isCompleted())
                   {
-                    log.warn(
-                      "{} - Ignoring retryable error after {} attempts and {}",
-                      id,
-                      backOffState[partition].getNumRetries(),
-                      backOffState[partition].getDurationSpendRetrying());
+                    log.warn("{} - Ignoring retryable error: {}", id, e.toString());
                   }
                   else
                   {
                     log.info(
-                      "{} - Retry in progress for offset={} in partition {}",
+                      "{} - Retry in progress for offset={} in {}, error: {}",
                       id,
                       record.offset(),
-                      partition);
+                      partition,
+                      e.toString());
                     consumer.seek(topicPartition, record.offset());
                     break;
                   }
@@ -182,7 +175,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable
               catch (NonRetriableErrorException e)
               {
                 // Just ignore, to skip
-                log.warn("{} - Ignoring non-retryable error!", id, e);
+                log.warn("{} - Ignoring non-retryable error: {}", id, e.toString());
               }
 
               backOffState[topicPartition.partition()].markAsCompleted();