Verbesserungen & Korrekturen aus simple-consumer übernommen
[demos/kafka/training] / src / main / java / de / juplo / kafka / Application.java
index 76c2520..e3219c1 100644 (file)
@@ -1,78 +1,24 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.kafka.annotation.KafkaListener;
 
-import javax.annotation.PreDestroy;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-
+// tag::supersimple[]
 @SpringBootApplication
 @Slf4j
-public class Application implements ApplicationRunner
+public class Application
 {
-  @Autowired
-  EndlessConsumer endlessConsumer;
-  @Autowired
-  ExecutorService executor;
-
-
-  @Override
-  public void run(ApplicationArguments args) throws Exception
+  @KafkaListener(id = "supersimple", topics = "test")
+  public void recieve(String message)
   {
-    log.info("Starting EndlessConsumer");
-    endlessConsumer.start();
+    log.info("Recieved message: {}", message);
   }
 
-  @PreDestroy
-  public void shutdown()
-  {
-    try
-    {
-      log.info("Stopping EndlessConsumer");
-      endlessConsumer.stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("Was already stopped: {}", e.toString());
-    }
-    catch (Exception e)
-    {
-      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
-    }
-
-    try
-    {
-      log.info("Shutting down the ExecutorService.");
-      executor.shutdown();
-      log.info("Waiting 5 seconds for the ExecutorService to terminate...");
-      executor.awaitTermination(5, TimeUnit.SECONDS);
-    }
-    catch (InterruptedException e)
-    {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
-    }
-    finally
-    {
-      if (!executor.isTerminated())
-      {
-        log.warn("Forcing shutdown of ExecutorService!");
-        executor
-            .shutdownNow()
-            .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName()));
-      }
-      log.info("Shutdow of ExecutorService finished");
-    }
-  }
-
-
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
   }
 }
+// end::supersimple[]