- EndlessConsumer consumer =
- new EndlessConsumer(
- Executors.newFixedThreadPool(1),
- repository,
- properties.getBootstrapServer(),
- properties.getGroupId(),
- properties.getClientId(),
- properties.getTopic(),
- properties.getAutoOffsetReset());
-
- consumer.start();
-
- return consumer;
+ log.info("Signaling SimpleConsumer to quit its work");
+ consumer.wakeup();
+ log.info("Waiting for SimpleConsumer to finish its work");
+ consumerJob.get();
+ log.info("SimpleConsumer finished its work");