From 2021bf4059da168382e3d4b8a8db05989011477c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Mar 2025 18:34:15 +0100 Subject: [PATCH] 1. Schritt Live-Umbau: Spring Boot --- pom.xml | 36 +++------ src/main/java/de/juplo/kafka/Application.java | 14 ++++ .../juplo/kafka/ApplicationConfiguration.java | 41 ++++++++++ .../de/juplo/kafka/ApplicationProperties.java | 17 +++++ .../java/de/juplo/kafka/ExampleConsumer.java | 74 +++---------------- src/main/resources/application.yml | 5 ++ 6 files changed, 99 insertions(+), 88 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/Application.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationConfiguration.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java create mode 100644 src/main/resources/application.yml diff --git a/pom.xml b/pom.xml index a1007175..9c40eb54 100644 --- a/pom.xml +++ b/pom.xml @@ -22,6 +22,14 @@ + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + org.apache.kafka kafka-clients @@ -31,43 +39,21 @@ lombok compile - - ch.qos.logback - logback-classic - - org.apache.maven.plugins - maven-dependency-plugin + org.springframework.boot + spring-boot-maven-plugin - copy-dependencies - package - copy-dependencies + build-info - - ${project.build.directory}/libs - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.ExampleConsumer - - - - pl.project13.maven git-commit-id-plugin diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 00000000..0069257f --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,14 @@ +package de.juplo.kafka; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class Application +{ + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java new file mode 100644 index 00000000..55c42f61 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -0,0 +1,41 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Properties; + + +@Configuration +@EnableConfigurationProperties(ApplicationProperties.class) +public class ApplicationConfiguration +{ + @Bean + public ExampleConsumer exampleConsumer( + Consumer kafkaConsumer, + ApplicationProperties properties) + { + return + new ExampleConsumer( + properties.getClientId(), + properties.getTopic(), + kafkaConsumer); + } + + @Bean + public KafkaConsumer kafkaConsumer(ApplicationProperties properties) + { + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("group.id", properties.getGroupId()); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", StringDeserializer.class.getName()); + + return new KafkaConsumer<>(props); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 00000000..c6fe30b7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,17 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties(prefix = "juplo") +@Getter +@Setter +public class ApplicationProperties +{ + private String bootstrapServer; + private String groupId; + private String clientId; + private String topic; +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 865e5cbe..1d9453b3 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -1,54 +1,46 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Arrays; -import java.util.Properties; @Slf4j -public class ExampleConsumer +public class ExampleConsumer implements Runnable { private final String id; private final String topic; private final Consumer consumer; + private final Thread workerThread; - private volatile boolean running = false; private long consumed = 0; public ExampleConsumer( - String broker, + String clientId, String topic, - String groupId, - String clientId) + Consumer consumer) { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - this.id = clientId; this.topic = topic; - consumer = new KafkaConsumer<>(props); + this.consumer = consumer; + + workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); + workerThread.start(); } + @Override public void run() { try { log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { @@ -77,7 +69,6 @@ public class ExampleConsumer } finally { - running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); @@ -94,48 +85,5 @@ public class ExampleConsumer consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } - - - public static void main(String[] args) throws Exception - { - if (args.length != 4) - { - log.error("Four arguments required!"); - log.error("args[0]: Broker-Address"); - log.error("args[1]: Topic"); - log.error("args[2]: Group-ID"); - log.error("args[3]: Unique Client-ID"); - System.exit(1); - return; - } - - - log.info( - "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", - args[0], - args[1], - args[2], - args[3]); - - ExampleConsumer instance = new ExampleConsumer(args[0], args[1], args[2], args[3]); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - instance.run(); - } } - + diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 00000000..6b7340b2 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,5 @@ +juplo: + bootstrap-server: localhost:9092 + group-id: my-group + client-id: DEV + topic: test -- 2.20.1