From: Kai Moritz Date: Tue, 6 May 2025 19:16:21 +0000 (+0200) Subject: Code an den Branch `spring/spring-consumer` angepasst X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f3be8287ae93436ce6164343da02a931b585c1f9;p=demos%2Fkafka%2Ftraining Code an den Branch `spring/spring-consumer` angepasst --- diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index cc1d580..0069257 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -7,7 +7,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { - public static void main(String args[]) + public static void main(String[] args) { SpringApplication.run(Application.class, args); } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0c356f1..ad5757d 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,10 +19,11 @@ public class ApplicationConfiguration Consumer kafkaConsumer, ApplicationProperties properties) { - return new ExampleConsumer( - kafkaConsumer, - properties.getTopic(), - properties.getClientId()); + return + new ExampleConsumer( + kafkaConsumer, + properties.getTopic(), + properties.getClientId()); } @Bean(destroyMethod = "") @@ -30,9 +31,9 @@ public class ApplicationConfiguration { Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBroker()); - props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits - props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("group.id", properties.getGroupId()); + props.put("client.id", properties.getClientId()); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 5743b77..2e57125 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -10,7 +10,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @Setter public class ApplicationProperties { - private String broker; + private String bootstrapServer; private String topic; private String groupId; private String clientId; diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 8861ccf..76aff96 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -16,7 +16,7 @@ public class ExampleConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; - private final Thread worker; + private final Thread workerThread; private long consumed = 0; @@ -29,9 +29,9 @@ public class ExampleConsumer implements Runnable this.id = clientId; this.topic = topic; - this.worker = new Thread(this, "ConsumerRunner-" + id); - log.info("{} - Starting worker-thread", id); - this.worker.start(); + log.info("{} - Starting worker thread", id); + this.workerThread = new Thread(this, "ConsumerRunner-" + id); + this.workerThread.start(); } @@ -87,20 +87,13 @@ public class ExampleConsumer implements Runnable log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); } - public void shutdown() + + public void shutdown() throws InterruptedException { log.info("{} - Waking up the consumer", id); consumer.wakeup(); - try - { - log.info("{} - Joining the worker-thread", id); - worker.join(Duration.ofSeconds(30)); - } - catch (InterruptedException e) - { - log.error("{} - Joining was interrupted: {}", id, e.toString()); - } + log.info("{} - Joining the worker thread", id); + workerThread.join(); log.info("{} - Shutdown completed!", id); } } -