projects
/
demos
/
kafka
/
training
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Service ergänzt, der das Dead-Letter-Topic ausliest
[demos/kafka/training]
/
src
/
main
/
java
/
de
/
juplo
/
kafka
/
EndlessConsumer.java
diff --git
a/src/main/java/de/juplo/kafka/EndlessConsumer.java
b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index
655151a
..
27c1e44
100644
(file)
--- a/
src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/
src/main/java/de/juplo/kafka/EndlessConsumer.java
@@
-25,7
+25,6
@@
public class EndlessConsumer
{
private final String id;
private final KafkaListenerEndpointRegistry registry;
{
private final String id;
private final KafkaListenerEndpointRegistry registry;
- private final ApplicationErrorHandler errorHandler;
private final RecordHandler recordHandler;
private long consumed = 0;
private final RecordHandler recordHandler;
private long consumed = 0;
@@
-83,7
+82,6
@@
public class EndlessConsumer
throw new IllegalStateException("Consumer instance " + id + " is already running!");
log.info("{} - Starting - consumed {} messages before", id, consumed);
throw new IllegalStateException("Consumer instance " + id + " is already running!");
log.info("{} - Starting - consumed {} messages before", id, consumed);
- errorHandler.clearState();
registry.getListenerContainer(id).start();
}
registry.getListenerContainer(id).start();
}
@@
-101,12
+99,4
@@
public class EndlessConsumer
{
return registry.getListenerContainer(id).isRunning();
}
{
return registry.getListenerContainer(id).isRunning();
}
-
- public Optional<Exception> exitStatus()
- {
- if (running())
- throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
- return errorHandler.getException();
- }
}
}