From f36f6a033156d7bfcd94443817c5aca0bf80155d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Jan 2025 10:21:10 +0100 Subject: [PATCH] Umbau des `spring-consumer` in den `supersimple-consumer` --- README.sh | 2 +- build.gradle | 5 +- docker/docker-compose.yml | 21 ++-- pom.xml | 17 +-- settings.gradle | 2 +- src/main/java/de/juplo/kafka/Application.java | 14 --- .../juplo/kafka/ApplicationConfiguration.java | 55 --------- .../de/juplo/kafka/ApplicationProperties.java | 52 --------- .../java/de/juplo/kafka/ExampleConsumer.java | 106 ++---------------- src/main/resources/application.yml | 20 +--- .../java/de/juplo/kafka/ApplicationTests.java | 6 +- 11 files changed, 37 insertions(+), 263 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/Application.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java diff --git a/README.sh b/README.sh index b46e2350..9cb5416e 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/supersimple-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index a8614fdf..1a3f05f9 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-SNAPSHOT' +version = '1.0-SNAPSHOT' java { toolchain { @@ -27,7 +27,7 @@ repositories { } dependencies { - implementation 'org.apache.kafka:kafka-clients' + implementation 'org.springframework.kafka:spring-kafka' implementation 'org.springframework.boot:spring-boot-starter-actuator' implementation 'org.springframework.boot:spring-boot-starter-validation' implementation 'org.springframework.boot:spring-boot-starter-web' @@ -36,7 +36,6 @@ dependencies { annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor' annotationProcessor 'org.projectlombok:lombok' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.kafka:spring-kafka' testImplementation 'org.springframework.kafka:spring-kafka-test' testCompileOnly 'org.projectlombok:lombok' testAnnotationProcessor 'org.projectlombok:lombok' diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4fa2eade..159db294 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -145,25 +145,22 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/supersimple-consumer:1.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: consumer - juplo.consumer.topic: test + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: consumer peter: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/supersimple-peter:1.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: peter - juplo.consumer.topic: test + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: peter ute: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/supersimple-ute:1.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: ute - juplo.consumer.topic: test + spring.kafka.bootstrap-servers: kafka:9092 + spring.kafka.client-id: ute volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index dd96d00f..b5e66417 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - spring-consumer - Spring Consumer - Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + supersimple-consumer + Supersimple Consumer-Group + Most minimal Consumer-Group ever! + 1.0-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -53,11 +53,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/settings.gradle b/settings.gradle index cacb2d5a..8995580a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1 +1 @@ -rootProject.name = 'spring-consumer' +rootProject.name = 'supersimple-consumer' diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 0069257f..00000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class Application -{ - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index b98c401d..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Properties; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, - ApplicationProperties properties, - ConfigurableApplicationContext applicationContext) - { - return - new ExampleConsumer( - properties.getClientId(), - properties.getConsumerProperties().getTopic(), - kafkaConsumer, - () -> applicationContext.close()); - } - - @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index c8193c9f..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; - - -@ConfigurationProperties(prefix = "juplo") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - - @NotNull - private ConsumerProperties consumer; - - - public ConsumerProperties getConsumerProperties() - { - return consumer; - } - - - @Validated - @Getter - @Setter - static class ConsumerProperties - { - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } - } -} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 4f7151fb..d04aa61f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,106 +1,24 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; - +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.KafkaListener; +// tag::supersimple[] +@SpringBootApplication @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - private final Thread workerThread; - private final Runnable closeCallback; - - private volatile boolean running = false; - private long consumed = 0; - - - public ExampleConsumer( - String clientId, - String topic, - Consumer consumer, - Runnable closeCallback) + @KafkaListener(id = "supersimple", topics = "test") + public void recieve(String message) { - this.id = clientId; - this.topic = topic; - this.consumer = consumer; - - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; + log.info("Recieved message: {}", message); } - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - running = true; - - while (running) - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - - - public void shutdown() throws InterruptedException + public static void main(String[] args) { - log.info("{} joining the worker-thread...", id); - running = false; - consumer.wakeup(); - workerThread.join(); + SpringApplication.run(ExampleConsumer.class, args); } } +// end::supersimple[] diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..7bf98f71 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ -juplo: - bootstrap-server: :9092 - client-id: DEV - consumer: - group-id: my-group - topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s +spring: + kafka: + bootstrap-servers: :9092 management: endpoint: shutdown: @@ -19,15 +14,6 @@ management: enabled: true java: enabled: true -info: - kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} - consumer: - group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ae119bff..50f28c91 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -19,13 +19,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.consumer.topic=" + TOPIC }) + "spring.kafka.consumer.auto-offset-reset=earliest", + }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests { - static final String TOPIC = "FOO"; + static final String TOPIC = "test"; static final int PARTITIONS = 10; @Autowired -- 2.20.1