package de.juplo.kafka;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.annotation.EnableKafka;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaOperations;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
-import org.springframework.kafka.listener.DefaultErrorHandler;
-import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.backoff.FixedBackOff;
-
-import java.util.Map;
-import java.util.Optional;
-
+import org.springframework.kafka.annotation.KafkaListener;
+// tag::supersimple[]
@SpringBootApplication
@Slf4j
-@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
-@EnableKafka
public class Application
{
- @Bean
- public ApplicationRecordHandler applicationRecordHandler(
- AdderResults adderResults,
- KafkaProperties kafkaProperties,
- ApplicationProperties applicationProperties)
- {
- return new ApplicationRecordHandler(
- adderResults,
- Optional.ofNullable(applicationProperties.getThrottle()),
- kafkaProperties.getConsumer().getGroupId());
- }
-
- @Bean
- public AdderResults adderResults()
- {
- return new AdderResults();
- }
-
- @Bean
- public ApplicationRebalanceListener rebalanceListener(
- ApplicationRecordHandler recordHandler,
- AdderResults adderResults,
- StateRepository stateRepository,
- KafkaProperties kafkaProperties)
- {
- return new ApplicationRebalanceListener(
- recordHandler,
- adderResults,
- stateRepository,
- kafkaProperties.getConsumer().getGroupId());
- }
-
- @Bean
- ApplicationHealthIndicator applicationHealthIndicator(
- KafkaListenerEndpointRegistry registry,
- KafkaProperties properties)
- {
- return new ApplicationHealthIndicator(
- properties.getConsumer().getGroupId(),
- registry);
- }
-
- @Bean
- public ProducerFactory<String, Object> producerFactory(
- KafkaProperties properties)
- {
- return new DefaultKafkaProducerFactory<>(
- properties.getProducer().buildProperties(),
- new StringSerializer(),
- new DelegatingByTypeSerializer(
- Map.of(
- byte[].class, new ByteArraySerializer(),
- MessageAddNumber.class, new JsonSerializer<>(),
- MessageCalculateSum.class, new JsonSerializer<>())));
- }
-
- @Bean
- public KafkaTemplate<String, Object> kafkaTemplate(
- ProducerFactory<String, Object> producerFactory)
- {
- return new KafkaTemplate<>(producerFactory);
- }
-
- @Bean
- public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
- KafkaOperations<?, ?> kafkaTemplate)
- {
- return new DeadLetterPublishingRecoverer(kafkaTemplate);
- }
-
- @Bean
- public DefaultErrorHandler errorHandler(
- DeadLetterPublishingRecoverer recoverer)
+ @KafkaListener(id = "supersimple", topics = "test")
+ public void recieve(String message)
{
- return new DefaultErrorHandler(
- recoverer,
- new FixedBackOff(0l, 0l));
+ log.info("Recieved message: {}", message);
}
-
public static void main(String[] args)
{
SpringApplication.run(Application.class, args);
}
}
+// end::supersimple[]