package de.juplo.kafka;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.concurrent.ListenableFuture;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
-import javax.annotation.PreDestroy;
-import java.util.concurrent.ExecutionException;
+import java.util.Map;
@SpringBootApplication
-@Slf4j
-public class Application implements ApplicationRunner
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+public class Application
{
- @Autowired
- ThreadPoolTaskExecutor taskExecutor;
- @Autowired
- Consumer<?, ?> kafkaConsumer;
- @Autowired
- SimpleConsumer simpleConsumer;
- @Autowired
- ConfigurableApplicationContext context;
+ @Bean
+ public ProducerFactory<String, Object> producerFactory(
+ KafkaProperties properties)
+ {
+ Map<String, Object> map = Map.of(
+ ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ return new DefaultKafkaProducerFactory<>(
+ map,
+ new StringSerializer(),
+ new DelegatingByTypeSerializer(
+ Map.of(
+ byte[].class, new ByteArraySerializer(),
+ MessageAddNumber.class, new JsonSerializer<>(),
+ MessageCalculateSum.class, new JsonSerializer<>())));
+ }
- ListenableFuture<Integer> consumerJob;
+ @Bean
+ public KafkaTemplate<String, Object> kafkaTemplate(
+ ProducerFactory<String, Object> producerFactory)
+ {
+ return new KafkaTemplate<>(producerFactory);
+ }
- @Override
- public void run(ApplicationArguments args) throws Exception
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+ KafkaOperations<?, ?> kafkaTemplate)
{
- log.info("Starting SimpleConsumer");
- consumerJob = taskExecutor.submitListenable(simpleConsumer);
- consumerJob.addCallback(
- exitStatus ->
- {
- log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
- SpringApplication.exit(context, () -> exitStatus);
- },
- t ->
- {
- log.error("SimpleConsumer exited abnormally!", t);
- SpringApplication.exit(context, () -> 2);
- });
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
}
- @PreDestroy
- public void shutdown() throws ExecutionException, InterruptedException
+ @Bean
+ public DefaultErrorHandler errorHandler(
+ DeadLetterPublishingRecoverer recoverer)
{
- log.info("Signaling SimpleConsumer to quit its work");
- kafkaConsumer.wakeup();
- log.info("Waiting for SimpleConsumer to finish its work");
- consumerJob.get();
- log.info("SimpleConsumer finished its work");
+ return new DefaultErrorHandler(
+ recoverer,
+ new FixedBackOff(0l, 0l));
}