{
private final String id;
private final KafkaListenerEndpointRegistry registry;
- private final ApplicationErrorHandler errorHandler;
private final RecordHandler recordHandler;
private long consumed = 0;
throw new IllegalStateException("Consumer instance " + id + " is already running!");
log.info("{} - Starting - consumed {} messages before", id, consumed);
- errorHandler.clearState();
registry.getListenerContainer(id).start();
}
{
return registry.getListenerContainer(id).isRunning();
}
-
- public Optional<Exception> exitStatus()
- {
- if (running())
- throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
- return errorHandler.getException();
- }
}