From: Kai Moritz Date: Fri, 10 Jan 2025 23:21:06 +0000 (+0100) Subject: Logging-Meldung des `BackOffState` vereinfacht und verbessert X-Git-Tag: consumer/spring-consumer--error-handling--2025-02-signal~10 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=7d2d6a2c4eaa41dae75f6cdb7b52b2793e0ee4f3;p=demos%2Fkafka%2Ftraining Logging-Meldung des `BackOffState` vereinfacht und verbessert --- diff --git a/src/main/java/de/juplo/kafka/BackOffState.java b/src/main/java/de/juplo/kafka/BackOffState.java index 37bef374..c6b3e8c2 100644 --- a/src/main/java/de/juplo/kafka/BackOffState.java +++ b/src/main/java/de/juplo/kafka/BackOffState.java @@ -44,7 +44,11 @@ class BackOffState this.startTime = clock.instant(); this.timeNextRetryIsDue = this.startTime; - log.debug("{} - Back-Off requested for {} at {}", id, topicPartition, startTime); + log.info( + "{} - Back-Off requested for offset={} in {}", + id, + offset, + topicPartition); backOffExecution = backOffStrategy.start(); initializeNextBackOff(); } @@ -60,14 +64,24 @@ class BackOffState Duration remaining = Duration.between(now, timeNextRetryIsDue); if (remaining.isNegative()) { - numRetries++; - log.info("{} - {}. retry for {}, lateness: {}", id, numRetries, topicPartition, remaining.abs()); + log.info( + "{} - {}. retry for offset={} in {}, lateness: {}", + id, + numRetries, + offset, + topicPartition, + remaining.abs()); initializeNextBackOff(); return false; } else { - log.info("{} - Next retry for {} is due in {}", id, topicPartition, remaining); + log.info( + "{} - Next retry for offset={} in {} is due in {}", + id, + offset, + topicPartition, + remaining); return true; } } @@ -82,26 +96,17 @@ class BackOffState return backOffExecution == null && timeNextRetryIsDue != null; } - int getNumRetries() - { - return numRetries; - } - - Duration getDurationSpendRetrying() - { - return Duration.between(startTime, timeNextRetryIsDue); - } - void markAsCompleted() { if (backOffExecution != null) { log.info( - "{} - retry #{} for {} succeeded after {}", + "{} - {}. retry for offset={} in {} succeeded after {}", id, numRetries, + offset, topicPartition, - Duration.between(startTime, clock.instant())); + Duration.between(startTime, timeNextRetryIsDue)); backOffExecution = null; timeNextRetryIsDue = null; @@ -118,6 +123,7 @@ class BackOffState } else { + numRetries++; timeNextRetryIsDue = clock.instant().plusMillis(backOffMillis); } } diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6465a582..b48baaa4 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -148,11 +148,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable int partition = topicPartition.partition(); if (!backOffState[partition].isStarted(record.offset())) { - log.info( - "{} - First occurrence of a retryable error for offset={} in partition {} - Initializing retry!", - id, - record.offset(), - partition); + log.info("{} - First occurrence of a retryable error: {}", id, e.toString()); backOffState[partition] = new BackOffState(id, backOffStrategy, clock, topicPartition, record.offset()); consumer.seek(topicPartition, record.offset()); break; @@ -161,19 +157,16 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable { if (backOffState[partition].isCompleted()) { - log.warn( - "{} - Ignoring retryable error after {} attempts and {}", - id, - backOffState[partition].getNumRetries(), - backOffState[partition].getDurationSpendRetrying()); + log.warn("{} - Ignoring retryable error: {}", id, e.toString()); } else { log.info( - "{} - Retry in progress for offset={} in partition {}", + "{} - Retry in progress for offset={} in {}, error: {}", id, record.offset(), - partition); + partition, + e.toString()); consumer.seek(topicPartition, record.offset()); break; } @@ -182,7 +175,7 @@ public class ExampleConsumer implements ConsumerRebalanceListener, Runnable catch (NonRetriableErrorException e) { // Just ignore, to skip - log.warn("{} - Ignoring non-retryable error!", id, e); + log.warn("{} - Ignoring non-retryable error: {}", id, e.toString()); } backOffState[topicPartition.partition()].markAsCompleted();