@Autowired
ThreadPoolTaskExecutor taskExecutor;
@Autowired
- Producer<?, ?> kafkaProducer;
- @Autowired
ExampleProducer exampleProducer;
@Autowired
ConfigurableApplicationContext context;
ListenableFuture<Integer> consumerJob;
@Override
- public void run(ApplicationArguments args) throws Exception
+ public void run(ApplicationArguments args)
{
- log.info("Starting SimpleConsumer");
+ log.info("Starting ExampleProducer");
consumerJob = taskExecutor.submitListenable(exampleProducer);
consumerJob.addCallback(
exitStatus ->
{
- log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+ log.info("ExampleProducer exited normally, exit-status: {}", exitStatus);
SpringApplication.exit(context, () -> exitStatus);
},
t ->
{
- log.error("SimpleConsumer exited abnormally!", t);
+ log.error("ExampleProducer exited abnormally!", t);
SpringApplication.exit(context, () -> 2);
});
}
- @PreDestroy
- public void shutdown() throws ExecutionException, InterruptedException
- {
- log.info("Signaling ExampleProducer to quit its work");
- exampleProducer.shutdown();
- log.info("Waiting for ExampleProducer to finish its work");
- consumerJob.get();
- log.info("ExampleProducer finished its work");
- }
-
public static void main(String[] args)
{
package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.event.ContextClosedEvent;
import java.util.Properties;
@Configuration
@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
public class ApplicationConfiguration
{
@Bean
- public ExampleProducer exampleProducer(
- ApplicationProperties properties,
- KafkaProducer<String, String> kafkaProducer)
- {
- return
- new ExampleProducer(
- properties.getClientId(),
- properties.getTopic(),
- kafkaProducer);
- }
-
- @Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public ExampleProducer exampleProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
- return new KafkaProducer<>(props);
+ return
+ new ExampleProducer(
+ properties.getClientId(),
+ properties.getTopic(),
+ new KafkaProducer<>(props));
+ }
+
+ @Bean
+ public ApplicationListener<ContextClosedEvent> shutdownHandler(ExampleProducer exampleProducer)
+ {
+ return event -> {
+ log.info("Shutdown is in progress... Signalling ExampleProducer to quit its work!");
+ exampleProducer.shutdown();
+ };
}
}
log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
return 1;
}
+ finally
+ {
+ log.info("{}: Closing the KafkaProducer", id);
+ producer.close();
+ log.info("{}: Produced {} messages in total, exiting!", id, produced);
+ }
- log.info("{}: Produced {} messages in total, exiting!", id, produced);
return 0;
}
compression-type: ${producer.compression-type}
logging:
level:
- root: INFO
- de.juplo: DEBUG
+ root: TRACE
server:
port: 8880