import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;
-import javax.annotation.PreDestroy;
import java.util.List;
import java.util.function.Consumer;
public synchronized void start()
{
+ if (registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
log.info("{} - Starting - consumed {} messages before", id, consumed);
registry.getListenerContainer(id).start();
}
public synchronized void stop()
{
+ if (!registry.getListenerContainer(id).isChildRunning())
+ throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
log.info("{} - Stopping", id);
registry.getListenerContainer(id).stop();
log.info("{} - Stopped - consumed {} messages so far", id, consumed);
}
-
- @PreDestroy
- public void destroy()
- {
- log.info("{} - Destroy!", id);
- stop();
- }
}