#!/bin/bash
-IMAGE=juplo/spring-consumer:1.1-generics-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-record-handler-SNAPSHOT
if [ "$1" = "cleanup" ]
then
}
group = 'de.juplo.kafka'
-version = '1.1-generics-SNAPSHOT'
+version = '1.1-record-handler-SNAPSHOT'
java {
toolchain {
command: kafka:9092 test producer
consumer:
- image: juplo/spring-consumer:1.1-generics-SNAPSHOT
+ image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
juplo.consumer.topic: test
peter:
- image: juplo/spring-consumer:1.1-generics-SNAPSHOT
+ image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
juplo.consumer.topic: test
ute:
- image: juplo/spring-consumer:1.1-generics-SNAPSHOT
+ image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT
environment:
spring.kafka.bootstrap-servers: kafka:9092
spring.kafka.client-id: consumer
<artifactId>spring-consumer</artifactId>
<name>Spring Consumer</name>
<description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
- <version>1.1-generics-SNAPSHOT</version>
+ <version>1.1-record-handler-SNAPSHOT</version>
<properties>
<java.version>21</java.version>
package de.juplo.kafka;
+import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
@Configuration
@EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
public class ApplicationConfiguration
{
@Bean
public ExampleConsumer<String, String> exampleConsumer(
Consumer<String, String> kafkaConsumer,
+ RecordHandler<String, String> recordHandler,
ApplicationProperties properties,
KafkaProperties kafkaProperties,
ConfigurableApplicationContext applicationContext)
kafkaProperties.getClientId(),
properties.getConsumerProperties().getTopic(),
kafkaConsumer,
+ recordHandler,
() -> applicationContext.close());
}
+ @Bean
+ public RecordHandler<String, String> recordHandler()
+ {
+ return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
+ }
+
@Bean(destroyMethod = "")
public Consumer<?, ?> kafkaConsumer(ConsumerFactory<?, ?> consumerFactory)
{
private final String id;
private final String topic;
private final Consumer<K, V> consumer;
+ private final RecordHandler<K, V> recordHandler;
private final Thread workerThread;
private final Runnable closeCallback;
String clientId,
String topic,
Consumer<K, V> consumer,
+ RecordHandler<K, V> recordHandler,
Runnable closeCallback)
{
this.id = clientId;
this.topic = topic;
this.consumer = consumer;
+ this.recordHandler = recordHandler;
workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
workerThread.start();
{
consumed++;
log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+ recordHandler.handleRecord(topic, partition, offset, key, value);
}
--- /dev/null
+package de.juplo.kafka;
+
+public interface RecordHandler<K, V>
+{
+ void handleRecord(
+ String topic,
+ Integer partition,
+ Long offset,
+ K key,
+ V value);
+}