From 285c3039fd1e2e15035f824e94e6686be0df47a3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 14 Nov 2024 21:33:44 +0100 Subject: [PATCH] Umbau des `spring-consumer` auf den `@KafkaHandler` --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 26 ++-- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 57 --------- .../de/juplo/kafka/ApplicationProperties.java | 52 -------- .../java/de/juplo/kafka/ExampleConsumer.java | 112 ++---------------- src/main/resources/application.yml | 25 ++-- 8 files changed, 43 insertions(+), 235 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java diff --git a/README.sh b/README.sh index 7152ec9d..ca477732 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-json-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index 3ddca4bb..c771b9c0 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-json-SNAPSHOT' +version = '1.1-kafkahandler-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 9fb105f3..736a7048 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -92,7 +92,7 @@ services: cub kafka-ready -b kafka-1:9092,kafka-2:9092,kafka-3:9092 3 60 > /dev/null 2>&1 || exit 1 if [ -e INITIALIZED ] then - echo -n Bereits konfiguriert: + echo -n Bereits konfiguriert: cat INITIALIZED kafka-topics --bootstrap-server kafka:9092 --describe --topic test else @@ -145,24 +145,30 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-json-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: consumer + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-json-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: peter + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: peter + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-json-SNAPSHOT + image: juplo/spring-consumer:1.1-kafkahandler-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: ute + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: ute + spring.kafka.consumer.auto-offset-reset: earliest + logging.level.org.apache.kafka.clients.consumer: INFO juplo.consumer.topic: test volumes: diff --git a/pom.xml b/pom.xml index 8dd99a57..284bb4a9 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-json-SNAPSHOT + 1.1-kafkahandler-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index 7aac916d..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,57 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Properties; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, - ApplicationProperties properties, - ConfigurableApplicationContext applicationContext) - { - return - new ExampleConsumer<>( - properties.getClientId(), - properties.getConsumerProperties().getTopic(), - kafkaConsumer, - () -> applicationContext.close()); - } - - @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", JsonDeserializer.class.getName()); - props.put("spring.json.type.mapping", "ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum"); - - return new KafkaConsumer<>(props); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index c8193c9f..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; - - -@ConfigurationProperties(prefix = "juplo") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - - @NotNull - private ConsumerProperties consumer; - - - public ConsumerProperties getConsumerProperties() - { - return consumer; - } - - - @Validated - @Getter - @Setter - static class ConsumerProperties - { - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } - } -} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index d647aa97..639d82fd 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,121 +1,31 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.kafka.annotation.KafkaHandler; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; @Slf4j -public class ExampleConsumer implements Runnable +@Component +@KafkaListener(topics = "${juplo.consumer.topic}") +public class ExampleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - private final Thread workerThread; - private final Runnable closeCallback; - - private volatile boolean running = false; + @Value("${spring.kafka.client-id}") + private String id; private long consumed = 0; - public ExampleConsumer( - String clientId, - String topic, - Consumer consumer, - Runnable closeCallback) - { - this.id = clientId; - this.topic = topic; - this.consumer = consumer; - - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; - } - - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - running = true; - - while (running) - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - K key, - V value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - switch (value.getType()) - { - case ADD -> addNumber((MessageAddNumber)value); - case CALC -> calcSum((MessageCalculateSum)value); - default -> log.error("{} - Ignoring message of unknown typ {}", id, value.getType()); - } - } - + @KafkaHandler private void addNumber(MessageAddNumber addNumber) { log.info("{} - Adding number {}", id, addNumber.getNext()); } + @KafkaHandler private void calcSum(MessageCalculateSum calculateSum) { log.info("{} - Calculating sum", id); } - - public void shutdown() throws InterruptedException - { - log.info("{} joining the worker-thread...", id); - running = false; - consumer.wakeup(); - workerThread.join(); - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..b3e33585 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ juplo: - bootstrap-server: :9092 - client-id: DEV consumer: - group-id: my-group topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s management: endpoint: shutdown: @@ -21,14 +16,20 @@ management: enabled: true info: kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} + bootstrap-server: ${spring.kafka.bootstrap-servers} + client-id: ${spring.kafka.client-id} + group-id: ${spring.kafka.consumer.group-id} + topic: ${simple.consumer.topic} + auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset} +spring: + kafka: + bootstrap-servers: :9092 + client-id: DEV consumer: - group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} -logging: + group-id: my-group + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + "[spring.json.type.mapping]": ADD:de.juplo.kafka.MessageAddNumber,CALC:de.juplo.kafka.MessageCalculateSum level: root: INFO de.juplo: DEBUG -- 2.20.1