Springify: Merge der Umstellung des Payloads auf JSON
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index a5a5ce6..15e1b4e 100644 (file)
@@ -9,8 +9,7 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.stereotype.Component;
 
-import javax.annotation.PreDestroy;
-import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 
@@ -25,6 +24,8 @@ public class EndlessConsumer<K, V>
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
+  @Autowired
+  ApplicationErrorHandler errorHandler;
 
   private long consumed = 0;
 
@@ -32,28 +33,22 @@ public class EndlessConsumer<K, V>
       id = "${consumer.client-id}",
       idIsGroup = false,
       topics = "${consumer.topic}",
-      containerFactory = "batchFactory",
       autoStartup = "false")
-  public void receive(List<ConsumerRecord<K, V>> records)
+  public void receive(ConsumerRecord<K, V> record)
   {
-    // Do something with the data...
-    log.info("{} - Received {} messages", id, records.size());
-    for (ConsumerRecord<K, V> record : records)
-    {
-      log.info(
-          "{} - {}: {}/{} - {}={}",
-          id,
-          record.offset(),
-          record.topic(),
-          record.partition(),
-          record.key(),
-          record.value()
-      );
+    log.info(
+        "{} - {}: {}/{} - {}={}",
+        id,
+        record.offset(),
+        record.topic(),
+        record.partition(),
+        record.key(),
+        record.value()
+    );
 
-      handler.accept(record);
+    handler.accept(record);
 
-      consumed++;
-    }
+    consumed++;
   }
 
 
@@ -63,6 +58,7 @@ public class EndlessConsumer<K, V>
       throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
+    errorHandler.clearException();
     registry.getListenerContainer(id).start();
   }
 
@@ -76,25 +72,11 @@ public class EndlessConsumer<K, V>
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
-  @PreDestroy
-  public void destroy()
+  public synchronized Optional<Exception> exitStatus()
   {
-    log.info("{} - Destroy!", id);
-    try
-    {
-      stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("{} - Was already stopped", id);
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
-    }
-    finally
-    {
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
-    }
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+    return errorHandler.getException();
   }
 }