@Autowired
Consumer<?, ?> kafkaConsumer;
@Autowired
- SimpleConsumer simpleConsumer;
+ ExampleConsumer exampleConsumer;
@Autowired
ConfigurableApplicationContext context;
@Override
public void run(ApplicationArguments args) throws Exception
{
- log.info("Starting SimpleConsumer");
- consumerJob = taskExecutor.submitListenable(simpleConsumer);
+ log.info("Starting ExampleConsumer");
+ consumerJob = taskExecutor.submitListenable(exampleConsumer);
consumerJob.addCallback(
exitStatus ->
{
- log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+ log.info("ExampleConsumer exited normally, exit-status: {}", exitStatus);
SpringApplication.exit(context, () -> exitStatus);
},
t ->
{
- log.error("SimpleConsumer exited abnormally!", t);
+ log.error("ExampleConsumer exited abnormally!", t);
SpringApplication.exit(context, () -> 2);
});
}
@PreDestroy
public void shutdown() throws ExecutionException, InterruptedException
{
- log.info("Signaling SimpleConsumer to quit its work");
+ log.info("Signaling ExampleConsumer to quit its work");
kafkaConsumer.wakeup();
- log.info("Waiting for SimpleConsumer to finish its work");
+ log.info("Waiting for ExampleConsumer to finish its work");
consumerJob.get();
- log.info("SimpleConsumer finished its work");
+ log.info("ExampleConsumer finished its work");
}
public class ApplicationConfiguration
{
@Bean
- public SimpleConsumer simpleConsumer(
+ public ExampleConsumer exampleConsumer(
Consumer<String, String> kafkaConsumer,
KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
return
- new SimpleConsumer(
+ new ExampleConsumer(
kafkaProperties.getClientId(),
applicationProperties.getTopics(),
kafkaConsumer);