From: Kai Moritz Date: Sat, 9 Apr 2022 16:50:43 +0000 (+0200) Subject: Fehlerbehandlung in EndlessConsumer.destroy() korrigiert X-Git-Tag: deserialization-synchroner-test~17^2~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=abc2ffdb4d2829cd2c4263df9f6cec10b2a60c03;p=demos%2Fkafka%2Ftraining Fehlerbehandlung in EndlessConsumer.destroy() korrigiert --- diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index b3dd446..f25e93c 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -110,7 +110,7 @@ public class EndlessConsumer implements Runnable { boolean stateChanged = running.compareAndSet(false, true); if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + throw new IllegalStateException("Consumer instance " + id + " is already running!"); log.info("{} - Starting - consumed {} messages before", id, consumed); future = executor.submit(this); @@ -120,7 +120,7 @@ public class EndlessConsumer implements Runnable { boolean stateChanged = running.compareAndSet(true, false); if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); + throw new IllegalStateException("Consumer instance " + id + " is not running!"); log.info("{} - Stopping", id); consumer.wakeup(); @@ -140,6 +140,10 @@ public class EndlessConsumer implements Runnable { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed);