Springify: Merge der Umstellung des Payloads auf JSON
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index ea899cc..15e1b4e 100644 (file)
@@ -9,8 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PreDestroy;
-import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 
@@ -25,6 +24,8 @@ public class EndlessConsumer<K, V>
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
+  @Autowired
+  ApplicationErrorHandler errorHandler;
 
   private long consumed = 0;
 
@@ -32,48 +33,50 @@ public class EndlessConsumer<K, V>
       id = "${consumer.client-id}",
       idIsGroup = false,
       topics = "${consumer.topic}",
-      containerFactory = "batchFactory",
       autoStartup = "false")
-  public void receive(List<ConsumerRecord<K, V>> records)
+  public void receive(ConsumerRecord<K, V> record)
   {
-    // Do something with the data...
-    log.info("{} - Received {} messages", id, records.size());
-    for (ConsumerRecord<K, V> record : records)
-    {
-      log.info(
-          "{} - {}: {}/{} - {}={}",
-          id,
-          record.offset(),
-          record.topic(),
-          record.partition(),
-          record.key(),
-          record.value()
-      );
+    log.info(
+        "{} - {}: {}/{} - {}={}",
+        id,
+        record.offset(),
+        record.topic(),
+        record.partition(),
+        record.key(),
+        record.value()
+    );
 
-      handler.accept(record);
+    handler.accept(record);
 
-      consumed++;
-    }
+    consumed++;
   }
 
 
   public synchronized void start()
   {
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
     log.info("{} - Starting - consumed {} messages before", id, consumed);
+    errorHandler.clearException();
     registry.getListenerContainer(id).start();
   }
 
   public synchronized void stop()
   {
+    if (!registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
     log.info("{} - Stopping", id);
     registry.getListenerContainer(id).stop();
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
-  @PreDestroy
-  public void destroy()
+  public synchronized Optional<Exception> exitStatus()
   {
-    log.info("{} - Destroy!", id);
-    stop();
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+    return errorHandler.getException();
   }
 }