From 95fb3f249f3a4f4c67c99a85af1a1b567f9a2f3c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 18 Jan 2025 10:21:10 +0100 Subject: [PATCH] Umbau des `spring-producer` in den `supersimple-producer` --- README.sh | 2 +- docker/docker-compose.yml | 20 +--- pom.xml | 17 +-- src/main/java/de/juplo/kafka/Application.java | 14 --- .../juplo/kafka/ApplicationConfiguration.java | 55 --------- .../de/juplo/kafka/ApplicationProperties.java | 52 --------- .../java/de/juplo/kafka/ExampleConsumer.java | 107 ++---------------- src/main/resources/application.yml | 20 +--- .../java/de/juplo/kafka/ApplicationTests.java | 6 +- 9 files changed, 27 insertions(+), 266 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/Application.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java diff --git a/README.sh b/README.sh index 142e2cc3..8c599bfd 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-SNAPSHOT +IMAGE=juplo/supersimple-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4fa2eade..20fe4f33 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -145,25 +145,9 @@ services: juplo.producer.throttle-ms: 100 consumer: - image: juplo/spring-consumer:1.1-SNAPSHOT + image: juplo/supersimple-consumer:1.0-SNAPSHOT environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: consumer - juplo.consumer.topic: test - - peter: - image: juplo/spring-consumer:1.1-SNAPSHOT - environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: peter - juplo.consumer.topic: test - - ute: - image: juplo/spring-consumer:1.1-SNAPSHOT - environment: - juplo.bootstrap-server: kafka:9092 - juplo.client-id: ute - juplo.consumer.topic: test + spring.kafka.bootstrap-servers: kafka:9092 volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index 7a03bfa1..a6d3d4c0 100644 --- a/pom.xml +++ b/pom.xml @@ -12,10 +12,10 @@ de.juplo.kafka - spring-consumer - Spring Consumer - Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-SNAPSHOT + supersimple-consumer + Supersimple Consumer-Group + Most minimal Consumer-Group ever! + 1.0-SNAPSHOT 21 @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -52,11 +52,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index 0069257f..00000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class Application -{ - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java deleted file mode 100644 index a4856a64..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ /dev/null @@ -1,55 +0,0 @@ -package de.juplo.kafka; - -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.StickyAssignor; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.Properties; - - -@Configuration -@EnableConfigurationProperties(ApplicationProperties.class) -public class ApplicationConfiguration -{ - @Bean - public ExampleConsumer exampleConsumer( - Consumer kafkaConsumer, - ApplicationProperties properties, - ConfigurableApplicationContext applicationContext) - { - return - new ExampleConsumer( - properties.getClientId(), - properties.getConsumerProperties().getTopic(), - kafkaConsumer, - () -> applicationContext.close()); - } - - @Bean(destroyMethod = "") - public KafkaConsumer kafkaConsumer(ApplicationProperties properties) - { - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("group.id", properties.getConsumerProperties().getGroupId()); - if (properties.getConsumerProperties().getAutoOffsetReset() != null) - { - props.put("auto.offset.reset", properties.getConsumerProperties().getAutoOffsetReset().name()); - } - if (properties.getConsumerProperties().getAutoCommitInterval() != null) - { - props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval()); - } - props.put("metadata.maxage.ms", 5000); // 5 Sekunden - props.put("partition.assignment.strategy", StickyAssignor.class.getName()); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - return new KafkaConsumer<>(props); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index c8193c9f..00000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,52 +0,0 @@ -package de.juplo.kafka; - -import jakarta.validation.constraints.NotEmpty; -import jakarta.validation.constraints.NotNull; -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.validation.annotation.Validated; - -import java.time.Duration; - - -@ConfigurationProperties(prefix = "juplo") -@Validated -@Getter -@Setter -public class ApplicationProperties -{ - @NotNull - @NotEmpty - private String bootstrapServer; - @NotNull - @NotEmpty - private String clientId; - - @NotNull - private ConsumerProperties consumer; - - - public ConsumerProperties getConsumerProperties() - { - return consumer; - } - - - @Validated - @Getter - @Setter - static class ConsumerProperties - { - @NotNull - @NotEmpty - private String groupId; - @NotNull - @NotEmpty - private String topic; - private OffsetReset autoOffsetReset; - private Duration autoCommitInterval; - - enum OffsetReset { latest, earliest, none } - } -} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 79382ef9..d04aa61f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,107 +1,24 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.WakeupException; - -import java.time.Duration; -import java.util.Arrays; - +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.kafka.annotation.KafkaListener; +// tag::supersimple[] +@SpringBootApplication @Slf4j -public class ExampleConsumer implements Runnable +public class ExampleConsumer { - private final String id; - private final String topic; - private final Consumer consumer; - private final Thread workerThread; - private final Runnable closeCallback; - - private volatile boolean running = false; - private long consumed = 0; - - - public ExampleConsumer( - String clientId, - String topic, - Consumer consumer, - Runnable closeCallback) + @KafkaListener(id = "supersimple", topics = "test") + public void recieve(String message) { - this.id = clientId; - this.topic = topic; - this.consumer = consumer; - - workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); - workerThread.start(); - - this.closeCallback = closeCallback; + log.info("Recieved message: {}", message); } - - @Override - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - running = true; - - while (running) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - consumer.unsubscribe(); - log.info("{} - Triggering exit of application!", id); - new Thread(closeCallback).start(); - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); - } - - - public void shutdown() throws InterruptedException + public static void main(String[] args) { - log.info("{} joining the worker-thread...", id); - running = false; - consumer.wakeup(); - workerThread.join(); + SpringApplication.run(ExampleConsumer.class, args); } } +// end::supersimple[] diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..7bf98f71 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ -juplo: - bootstrap-server: :9092 - client-id: DEV - consumer: - group-id: my-group - topic: test - auto-offset-reset: earliest - auto-commit-interval: 5s +spring: + kafka: + bootstrap-servers: :9092 management: endpoint: shutdown: @@ -19,15 +14,6 @@ management: enabled: true java: enabled: true -info: - kafka: - bootstrap-server: ${juplo.bootstrap-server} - client-id: ${juplo.client-id} - consumer: - group-id: ${juplo.consumer.group-id} - topic: ${juplo.consumer.topic} - auto-offset-reset: ${juplo.consumer.auto-offset-reset} - auto-commit-interval: ${juplo.consumer.auto-commit-interval} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 092660b4..6fada8e9 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -19,13 +19,13 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.consumer.topic=" + TOPIC }) + "spring.kafka.consumer.auto-offset-reset=earliest", + }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests { - static final String TOPIC = "FOO"; + static final String TOPIC = "test"; static final int PARTITIONS = 10; @Autowired -- 2.20.1