From: Kai Moritz Date: Mon, 7 Nov 2022 20:28:28 +0000 (+0100) Subject: Spring-Boot Version des Simple-Consumer X-Git-Tag: spring-consumer--topicpartition-DEPRECATED~7 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5b9f4cd21a87b03cb1c432e9965fb0082ab05dd3;p=demos%2Fkafka%2Ftraining Spring-Boot Version des Simple-Consumer --- diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 0000000..fd6cecd --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/Dockerfile b/Dockerfile index 73b568e..16ee25e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM openjdk:11-jre VOLUME /tmp COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ "DCKR" ] +CMD [] diff --git a/README.sh b/README.sh index a85ad39..53d1aa7 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/simple-consumer:1.0-SNAPSHOT +IMAGE=juplo/spring-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/pom.xml b/pom.xml index 2217d9b..c73251c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,12 +12,33 @@ de.juplo.kafka - simple-consumer + spring-consumer 1.0-SNAPSHOT - Simple Consumer-Group - Super Simple Consumer-Group, that is implemented as a plain Java-program + Spring Consumer + Super Simple Consumer-Group, that is implemented as a Spring-Boot application + + + 11 + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-configuration-processor + true + org.apache.kafka kafka-clients @@ -27,45 +48,33 @@ lombok - ch.qos.logback - logback-classic + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.kafka + spring-kafka-test + test - pl.project13.maven - git-commit-id-plugin - - - org.apache.maven.plugins - maven-dependency-plugin + org.springframework.boot + spring-boot-maven-plugin - copy-dependencies - package - copy-dependencies + build-info - - ${project.build.directory}/libs - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.SimpleConsumer - - - + pl.project13.maven + git-commit-id-plugin io.fabric8 @@ -88,8 +97,10 @@ + + maven-failsafe-plugin + - diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 0000000..3828b1d --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,52 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import javax.annotation.PreDestroy; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + + +@SpringBootApplication +@Slf4j +public class Application implements ApplicationRunner +{ + @Autowired + ThreadPoolTaskExecutor taskExecutor; + @Autowired + Consumer kafkaConsumer; + @Autowired + SimpleConsumer simpleConsumer; + + Future consumerJob; + + @Override + public void run(ApplicationArguments args) throws Exception + { + log.info("Starting SimpleConsumer"); + consumerJob = taskExecutor.submit(simpleConsumer); + } + + @PreDestroy + public void shutdown() throws ExecutionException, InterruptedException + { + log.info("Signaling SimpleConsumer to quit its work"); + kafkaConsumer.wakeup(); + log.info("Waiting for SimpleConsumer to finish its work"); + consumerJob.get(); + log.info("SimpleConsumer finished its work"); + } + + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000..46bb667 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,43 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + + +@Configuration +@EnableConfigurationProperties({ ApplicationProperties.class }) +public class ApplicationConfiguration +{ + @Bean + public SimpleConsumer simpleConsumer( + Consumer kafkaConsumer, + ApplicationProperties applicationProperties) + { + return + new SimpleConsumer( + applicationProperties.getClientId(), + applicationProperties.getTopic(), + kafkaConsumer); + } + + @Bean + public Consumer kafkaConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServers()); + props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits + props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung + props.put("auto.offset.reset", properties.getAutoOffsetReset()); + props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 0000000..5675db7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,33 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; + + +@ConfigurationProperties(prefix = "simple.consumer") +@Validated +@Getter +@Setter +public class ApplicationProperties +{ + @NotNull + @NotEmpty + private String bootstrapServers; + @NotNull + @NotEmpty + private String groupId; + @NotNull + @NotEmpty + private String clientId; + @NotNull + @NotEmpty + private String topic; + @NotNull + @NotEmpty + private String autoOffsetReset; +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 9741e67..0e37686 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -1,52 +1,34 @@ package de.juplo.kafka; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; -import java.util.Properties; @Slf4j -public class SimpleConsumer +@RequiredArgsConstructor +public class SimpleConsumer implements Runnable { private final String id; private final String topic; - private final KafkaConsumer consumer; + private final Consumer consumer; - private volatile boolean running = false; private long consumed = 0; - public SimpleConsumer(String broker, String topic, String groupId, String clientId) - { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("auto.offset.reset", "earliest"); // Von Beginn an lesen - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - consumer = new KafkaConsumer<>(props); - - this.topic = topic; - this.id = clientId; - } - + @Override public void run() { try { log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { @@ -80,58 +62,9 @@ public class SimpleConsumer } finally { - running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } - - - public static void main(String[] args) throws Exception - { - String broker = ":9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; - - switch (args.length) - { - case 4: - clientId = args[3]; - case 3: - groupId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - - SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}", - broker, - topic, - groupId, - clientId); - instance.run(); - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..7089338 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,33 @@ +simple: + consumer: + bootstrap-servers: ":9092" + group-id: "my-group" + client-id: "DEV" + topic: test + auto-offset-reset: earliest +management: + endpoint: + shutdown: + enabled: true + endpoints: + web: + exposure: + include: "*" + info: + env: + enabled: true + java: + enabled: true +info: + kafka: + bootstrap-server: ${simple.consumer.bootstrap-servers} + client-id: ${simple.consumer.client-id} + group-id: ${simple.consumer.group-id} + topic: ${simple.consumer.topic} + auto-offset-reset: ${simple.consumer.auto-offset-reset} +logging: + level: + root: INFO + de.juplo: DEBUG +server: + port: 8881