package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
-import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.Optional;
@SpringBootApplication
@Slf4j
-public class Application implements ApplicationRunner
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@EnableKafka
+public class Application
{
- @Autowired
- EndlessConsumer endlessConsumer;
+ @Bean
+ public ApplicationRecordHandler applicationRecordHandler(
+ AdderResults adderResults,
+ KafkaProperties kafkaProperties,
+ ApplicationProperties applicationProperties)
+ {
+ return new ApplicationRecordHandler(
+ adderResults,
+ Optional.ofNullable(applicationProperties.getThrottle()),
+ kafkaProperties.getConsumer().getGroupId());
+ }
+
+ @Bean
+ public AdderResults adderResults()
+ {
+ return new AdderResults();
+ }
+
+ @Bean
+ public ApplicationRebalanceListener rebalanceListener(
+ ApplicationRecordHandler recordHandler,
+ AdderResults adderResults,
+ StateRepository stateRepository,
+ KafkaProperties kafkaProperties)
+ {
+ return new ApplicationRebalanceListener(
+ recordHandler,
+ adderResults,
+ stateRepository,
+ kafkaProperties.getConsumer().getGroupId());
+ }
+ @Bean
+ ApplicationHealthIndicator applicationHealthIndicator(
+ KafkaListenerEndpointRegistry registry,
+ KafkaProperties properties)
+ {
+ return new ApplicationHealthIndicator(
+ properties.getConsumer().getGroupId(),
+ registry);
+ }
+
+ @Bean
+ public ProducerFactory<String, Object> producerFactory(
+ KafkaProperties properties)
+ {
+ return new DefaultKafkaProducerFactory<>(
+ properties.getProducer().buildProperties(),
+ new StringSerializer(),
+ new DelegatingByTypeSerializer(
+ Map.of(
+ byte[].class, new ByteArraySerializer(),
+ MessageAddNumber.class, new JsonSerializer<>(),
+ MessageCalculateSum.class, new JsonSerializer<>())));
+ }
+
+ @Bean
+ public KafkaTemplate<String, Object> kafkaTemplate(
+ ProducerFactory<String, Object> producerFactory)
+ {
+ return new KafkaTemplate<>(producerFactory);
+ }
- @Override
- public void run(ApplicationArguments args) throws Exception
+ @Bean
+ public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+ KafkaOperations<?, ?> kafkaTemplate)
{
- log.info("Starting EndlessConsumer");
- endlessConsumer.start();
+ return new DeadLetterPublishingRecoverer(kafkaTemplate);
}
- @PreDestroy
- public void shutdown()
+ @Bean
+ public DefaultErrorHandler errorHandler(
+ DeadLetterPublishingRecoverer recoverer)
{
- try
- {
- log.info("Stopping EndlessConsumer");
- endlessConsumer.stop();
- }
- catch (IllegalStateException e)
- {
- log.info("Was already stopped: {}", e.toString());
- }
- catch (Exception e)
- {
- log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
- }
+ return new DefaultErrorHandler(
+ recoverer,
+ new FixedBackOff(0l, 0l));
}