package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@Configuration
@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@Slf4j
public class ApplicationConfiguration
{
@Bean
- public Consumer<ConsumerRecord<String, ClientMessage>> consumer()
+ public Consumer<ClientMessage> messageHandler()
{
- return (record) ->
+ return (message) ->
{
- // Handle record
+ log.info("Received ClientMessage: {}", message);
+ };
+ }
+
+ @Bean
+ public Consumer<Greeting> greetingsHandler()
+ {
+ return (greeting) ->
+ {
+ log.info("Received Greeting: {}", greeting);
};
}
new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
- ClientMessage.class, new JsonSerializer<>())));
+ Object.class, new JsonSerializer<>())));
}
@Bean
}
@Bean(destroyMethod = "close")
- public org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer(ConsumerFactory<String, ClientMessage> factory)
+ public org.apache.kafka.clients.consumer.Consumer<String, Object> kafkaConsumer(ConsumerFactory<String, Object> factory)
{
return factory.createConsumer();
}
@RequiredArgsConstructor
public class ApplicationHealthIndicator implements HealthIndicator
{
- private final EndlessConsumer<String, Long> consumer;
+ private final EndlessConsumer consumer;
@Override
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
@Component
@Slf4j
@RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
+@KafkaListener(
+ id = "${spring.kafka.consumer.client-id}",
+ idIsGroup = false,
+ topics = "${consumer.topic}",
+ autoStartup = "false")
+public class EndlessConsumer implements ConsumerAwareRebalanceListener
{
@Autowired
private KafkaListenerEndpointRegistry registry;
@Value("${spring.kafka.consumer.client-id}")
String id;
@Autowired
- Consumer<ConsumerRecord<K, V>> handler;
+ Consumer<ClientMessage> messageHandler;
+ @Autowired
+ Consumer<Greeting> greetingsHandler;
private long consumed = 0;
}
- @KafkaListener(
- id = "${spring.kafka.consumer.client-id}",
- idIsGroup = false,
- topics = "${consumer.topic}",
- autoStartup = "false")
- public void receive(ConsumerRecord<K, V> record)
+ @KafkaHandler
+ public void receiveGreeting(Greeting greeting)
{
- log.info(
- "{} - {}: {}/{} - {}={}",
- id,
- record.offset(),
- record.topic(),
- record.partition(),
- record.key(),
- record.value()
- );
-
- handler.accept(record);
+ greetingsHandler.accept(greeting);
+ consumed++;
+ }
+ @KafkaHandler
+ public void receiveMessage(ClientMessage message)
+ {
+ messageHandler.accept(message);
consumed++;
}
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+
+@Data
+public class Greeting
+{
+ String name;
+ LocalDateTime when;
+}
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.value.delegate.class: "org.springframework.kafka.support.serializer.JsonDeserializer"
- spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage"
+ spring.json.type.mapping: "message:de.juplo.kafka.ClientMessage,greeting:de.juplo.kafka.Greeting"
spring.json.trusted.packages: "de.juplo.kafka"
producer:
bootstrap-servers: :9092
@Autowired
KafkaProducer<String, Bytes> kafkaProducer;
@Autowired
- org.apache.kafka.clients.consumer.Consumer<String, ClientMessage> kafkaConsumer;
- @Autowired
KafkaConsumer<Bytes, Bytes> offsetConsumer;
@Autowired
ApplicationProperties applicationProperties;