Code an den Branch `spring/spring-consumer` angepasst grundlagen/simple-consumer--threading-neu
authorKai Moritz <kai@juplo.de>
Tue, 6 May 2025 19:16:21 +0000 (21:16 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 6 May 2025 19:16:21 +0000 (21:16 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index cc1d580..0069257 100644 (file)
@@ -7,7 +7,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 @SpringBootApplication
 public class Application
 {
-  public static void main(String args[])
+  public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
   }
index 0c356f1..ad5757d 100644 (file)
@@ -19,10 +19,11 @@ public class ApplicationConfiguration
     Consumer<String, String> kafkaConsumer,
     ApplicationProperties properties)
   {
-    return new ExampleConsumer(
-      kafkaConsumer,
-      properties.getTopic(),
-      properties.getClientId());
+    return
+      new ExampleConsumer(
+        kafkaConsumer,
+        properties.getTopic(),
+        properties.getClientId());
   }
 
   @Bean(destroyMethod = "")
@@ -30,9 +31,9 @@ public class ApplicationConfiguration
   {
     Properties props = new Properties();
 
-    props.put("bootstrap.servers", properties.getBroker());
-    props.put("group.id", properties.getGroupId()); // ID für die Offset-Commits
-    props.put("client.id", properties.getClientId()); // Nur zur Wiedererkennung
+    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("group.id", properties.getGroupId());
+    props.put("client.id", properties.getClientId());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
index 5743b77..2e57125 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 @Setter
 public class ApplicationProperties
 {
-  private String broker;
+  private String bootstrapServer;
   private String topic;
   private String groupId;
   private String clientId;
index 8861ccf..76aff96 100644 (file)
@@ -16,7 +16,7 @@ public class ExampleConsumer implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<String, String> consumer;
-  private final Thread worker;
+  private final Thread workerThread;
 
   private long consumed = 0;
 
@@ -29,9 +29,9 @@ public class ExampleConsumer implements Runnable
     this.id = clientId;
     this.topic = topic;
 
-    this.worker = new Thread(this, "ConsumerRunner-" + id);
-    log.info("{} - Starting worker-thread", id);
-    this.worker.start();
+    log.info("{} - Starting worker thread", id);
+    this.workerThread = new Thread(this, "ConsumerRunner-" + id);
+    this.workerThread.start();
   }
 
 
@@ -87,20 +87,13 @@ public class ExampleConsumer implements Runnable
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }
 
-  public void shutdown()
+
+  public void shutdown() throws InterruptedException
   {
     log.info("{} - Waking up the consumer", id);
     consumer.wakeup();
-    try
-    {
-      log.info("{} - Joining the worker-thread", id);
-      worker.join(Duration.ofSeconds(30));
-    }
-    catch (InterruptedException e)
-    {
-      log.error("{} - Joining was interrupted: {}", id, e.toString());
-    }
+    log.info("{} - Joining the worker thread", id);
+    workerThread.join();
     log.info("{} - Shutdown completed!", id);
   }
 }
-