From: Kai Moritz Date: Sun, 27 Oct 2024 21:08:53 +0000 (+0100) Subject: `ExampleConsumer` in eine Spring-Boot App umgebaut (ohne Spring Kafka) X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d308b2573f43fb41262758cd440bcf27082c93f6;p=demos%2Fkafka%2Ftraining `ExampleConsumer` in eine Spring-Boot App umgebaut (ohne Spring Kafka) --- diff --git a/.dockerignore b/.dockerignore index 49f82a9..1ad9963 100644 --- a/.dockerignore +++ b/.dockerignore @@ -1,3 +1,2 @@ * !target/*.jar -!target/libs/*.jar diff --git a/.maven-dockerinclude b/.maven-dockerinclude index a00c65f..fd6cecd 100644 --- a/.maven-dockerinclude +++ b/.maven-dockerinclude @@ -1,2 +1 @@ target/*.jar -target/libs/*.jar diff --git a/Dockerfile b/Dockerfile index 22819af..9e196ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM eclipse-temurin:21-jre VOLUME /tmp COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ "kafka:9092", "test", "my-group", "DCKR" ] +CMD [] diff --git a/README.sh b/README.sh index 85b8f96..b46e235 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/simple-consumer:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index cba608b..2366bc2 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -194,8 +194,11 @@ services: command: kafka:9092 test producer consumer: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer + image: juplo/spring-consumer:1.1-SNAPSHOT + environment: + juplo.consumer.bootstrap-server: kafka:9092 + juplo.consumer.client-id: consumer + juplo.consumer.topic: test volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml index 2d81d24..98a0a36 100644 --- a/pom.xml +++ b/pom.xml @@ -12,16 +12,33 @@ de.juplo.kafka - simple-consumer - Simple Consumer-Group - Super Simple Consumer-Group, that is implemented as a plain Java-program - 1.0-SNAPSHOT + spring-consumer + Spring Consumer + Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka + 1.1-SNAPSHOT 21 + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.springframework.boot + spring-boot-starter-validation + org.apache.kafka kafka-clients @@ -31,42 +48,35 @@ lombok - ch.qos.logback - logback-classic + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka + test + + + org.springframework.kafka + spring-kafka-test + test - org.apache.maven.plugins - maven-dependency-plugin + org.springframework.boot + spring-boot-maven-plugin - copy-dependencies - package - copy-dependencies + build-info - - ${project.build.directory}/libs - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.ExampleConsumer - - - - pl.project13.maven git-commit-id-plugin diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 0000000..0069257 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,14 @@ +package de.juplo.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class Application +{ + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000..a3f3835 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,52 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.StickyAssignor; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + + +@Configuration +@EnableConfigurationProperties(ApplicationProperties.class) +public class ApplicationConfiguration +{ + @Bean + public ExampleConsumer exampleConsumer( + Consumer kafkaConsumer, + ApplicationProperties properties) + { + return + new ExampleConsumer( + properties.getClientId(), + properties.getTopic(), + kafkaConsumer); + } + + @Bean + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("group.id", properties.getGroupId()); + if (properties.getAutoOffsetReset() != null) + { + props.put("auto.offset.reset", properties.getAutoOffsetReset().name()); + } + if (properties.autoCommitInterval != null) + { + props.put("auto.commit.interval", properties.getAutoCommitInterval()); + } + props.put("metadata.maxage.ms", 5000); // 5 Sekunden + props.put("partition.assignment.strategy", StickyAssignor.class.getName()); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 0000000..f7134fb --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,35 @@ +package de.juplo.kafka; + +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import java.time.Duration; + + +@ConfigurationProperties(prefix = "juplo.consumer") +@Validated +@Getter +@Setter +public class ApplicationProperties +{ + @NotNull + @NotEmpty + private String bootstrapServer; + @NotNull + @NotEmpty + private String clientId; + @NotNull + @NotEmpty + private String groupId; + @NotNull + @NotEmpty + private String topic; + ApplicationProperties.OffsetReset autoOffsetReset; + Duration autoCommitInterval; + + enum OffsetReset { latest, earliest, none} +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 9bdab7c..772643f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,50 +1,42 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; -import java.util.Properties; @Slf4j -public class ExampleConsumer +public class ExampleConsumer implements Runnable { private final String id; private final String topic; private final Consumer consumer; + private final Thread workerThread; private volatile boolean running = false; private long consumed = 0; + public ExampleConsumer( - String broker, + String clientId, String topic, - String groupId, - String clientId) + Consumer consumer) { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("auto.offset.reset", "earliest"); // Von Beginn an lesen - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - props.put("metadata.maxage.ms", 5000); - this.id = clientId; this.topic = topic; - consumer = new KafkaConsumer<>(props); + this.consumer = consumer; + + workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); + workerThread.start(); } + @Override public void run() { try @@ -53,7 +45,7 @@ public class ExampleConsumer consumer.subscribe(Arrays.asList(topic)); running = true; - while (true) + while (running) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); @@ -81,9 +73,6 @@ public class ExampleConsumer } finally { - running = false; - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } @@ -100,50 +89,11 @@ public class ExampleConsumer } - public static void main(String[] args) throws Exception + public void shutdown() throws InterruptedException { - String broker = ":9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; - - switch (args.length) - { - case 4: - clientId = args[3]; - case 3: - groupId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - - ExampleConsumer instance = new ExampleConsumer(broker, topic, groupId, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", - broker, - topic, - groupId, - clientId); - instance.run(); + log.info("{} joining the worker-thread...", id); + running = false; + consumer.wakeup(); + workerThread.join(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..5a89ee5 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,36 @@ +juplo: + consumer: + bootstrap-server: :9092 + client-id: DEV + group-id: my-group + topic: test + auto-offset-reset: earliest + auto-commit-interval: 5s +management: + endpoint: + shutdown: + enabled: true + endpoints: + web: + exposure: + include: "*" + info: + env: + enabled: true + java: + enabled: true +info: + kafka: + bootstrap-server: ${juplo.consumer.bootstrap-server} + client-id: ${juplo.consumer.client-id} + group-id: ${juplo.consumer.group-id} + topic: ${juplo.consumer.topic} + auto-offset-reset: ${juplo.consumer.auto-offset-reset} + auto-commit-interval: ${juplo.consumer.auto-commit-interval} +logging: + level: + root: INFO + de.juplo: DEBUG + org.springframework.kafka: INFO +server: + port: 8881 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 7a25e76..9c7af76 100644 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -7,8 +7,6 @@ - - diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java new file mode 100644 index 0000000..e4b97a4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -0,0 +1,47 @@ +package de.juplo.kafka; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.consumer.auto-offset-reset=earliest", + "juplo.consumer.topic=" + TOPIC }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +public class ApplicationTests +{ + static final String TOPIC = "FOO"; + static final int PARTITIONS = 10; + + @Autowired + MockMvc mockMvc; + + + + @Test + public void testApplicationStartup() + { + await("Application is healthy") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> mockMvc + .perform(get("/actuator/health")) + .andExpect(status().isOk()) + .andExpect(jsonPath("status").value("UP"))); + } +}