From 1e023b432920692f99a4e5b9d50369b8651956c5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 6 May 2025 20:40:27 +0200 Subject: [PATCH] Separater ConsumerRunner --- pom.xml | 36 ++++--------- src/main/java/de/juplo/kafka/Application.java | 14 +++++ .../juplo/kafka/ApplicationConfiguration.java | 21 ++++++++ .../de/juplo/kafka/ApplicationProperties.java | 17 ++++++ .../java/de/juplo/kafka/ConsumerRunner.java | 40 ++++++++++++++ .../java/de/juplo/kafka/ExampleConsumer.java | 53 ++----------------- src/main/resources/application.yml | 7 +++ 7 files changed, 115 insertions(+), 73 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Application.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java create mode 100644 src/main/java/de/juplo/kafka/ConsumerRunner.java create mode 100644 src/main/resources/application.yml diff --git a/pom.xml b/pom.xml index a100717..9c40eb5 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,14 @@ + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + org.apache.kafka kafka-clients @@ -31,43 +39,21 @@ lombok compile - - ch.qos.logback - logback-classic - - org.apache.maven.plugins - maven-dependency-plugin + org.springframework.boot + spring-boot-maven-plugin - copy-dependencies - package - copy-dependencies + build-info - - ${project.build.directory}/libs - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.ExampleConsumer - - - - pl.project13.maven git-commit-id-plugin diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 0000000..cc1d580 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,14 @@ +package de.juplo.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class Application +{ + public static void main(String args[]) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 0000000..76659d4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + + +@Configuration +@EnableConfigurationProperties(ApplicationProperties.class) +public class ApplicationConfiguration +{ + @Bean + public ExampleConsumer exampleConsumer(ApplicationProperties properties) + { + return new ExampleConsumer( + properties.getBroker(), + properties.getTopic(), + properties.getGroupId(), + properties.getClientId()); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 0000000..5743b77 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,17 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties(prefix = "juplo.consumer") +@Getter +@Setter +public class ApplicationProperties +{ + private String broker; + private String topic; + private String groupId; + private String clientId; +} diff --git a/src/main/java/de/juplo/kafka/ConsumerRunner.java b/src/main/java/de/juplo/kafka/ConsumerRunner.java new file mode 100644 index 0000000..b9a077a --- /dev/null +++ b/src/main/java/de/juplo/kafka/ConsumerRunner.java @@ -0,0 +1,40 @@ +package de.juplo.kafka; + +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.time.Duration; + + +@Component +@Slf4j +public class ConsumerRunner +{ + private final ExampleConsumer exampleConsumer; + private final Thread worker; + + public ConsumerRunner(ExampleConsumer exampleConsumer) + { + this.exampleConsumer = exampleConsumer; + this.worker = new Thread(exampleConsumer, "ConsumerRunner-" + exampleConsumer); + log.info("Starting consumer: {}", exampleConsumer); + this.worker.start(); + } + + @PreDestroy + public void close() + { + log.info("Stopping: {}", exampleConsumer); + exampleConsumer.consumer.wakeup(); + try + { + worker.join(Duration.ofSeconds(30)); + } + catch (InterruptedException e) + { + log.error("Fehler: {} - {}", exampleConsumer, e.toString()); + } + log.info("Done! {}", exampleConsumer); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7be827e..beaec07 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -14,13 +15,13 @@ import java.util.Properties; @Slf4j -public class ExampleConsumer +@ToString(of = "id") +public class ExampleConsumer implements Runnable { private final String id; private final String topic; - private final Consumer consumer; + final Consumer consumer; - private volatile boolean running = false; private long consumed = 0; public ExampleConsumer( @@ -42,13 +43,13 @@ public class ExampleConsumer } + @Override public void run() { try { log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { @@ -80,7 +81,6 @@ public class ExampleConsumer log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - running = false; } } @@ -94,48 +94,5 @@ public class ExampleConsumer consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } - - - public static void main(String[] args) throws Exception - { - if (args.length != 4) - { - log.error("Four arguments required!"); - log.error("args[0]: Broker-Address"); - log.error("args[1]: Topic"); - log.error("args[2]: Group-ID"); - log.error("args[3]: Unique Client-ID"); - System.exit(1); - return; - } - - - log.info( - "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", - args[0], - args[1], - args[2], - args[3]); - - ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - instance.run(); - } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..3ce636e --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,7 @@ +juplo: + consumer: + broker: localhost:9092 + topic: test + group-id: my-group + client-id: IntelliJ + -- 2.20.1