WIP:start+stop
authorKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 08:46:58 +0000 (09:46 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 11:19:09 +0000 (12:19 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 8b2611c..65a1b5d 100644 (file)
@@ -10,6 +10,8 @@ import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 @Slf4j
@@ -20,9 +22,10 @@ public class ExampleConsumer implements Runnable
   private final Consumer<String, Long> consumer;
   private final RecordHandler<String, Long> recordHandler;
   private final ConsumerRebalanceListener rebalanceListener;
-  private final Thread workerThread;
   private final Runnable closeCallback;
+  private final Lock lock = new ReentrantLock(true);
 
+  private Thread workerThread;
   private volatile boolean running = false;
   private long consumed = 0;
 
@@ -40,11 +43,9 @@ public class ExampleConsumer implements Runnable
     this.consumer = consumer;
     this.recordHandler = recordHandler;
     this.rebalanceListener = rebalanceListener;
-
-    workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
-    workerThread.start();
-
     this.closeCallback = closeCallback;
+
+    start();
   }
 
 
@@ -59,27 +60,7 @@ public class ExampleConsumer implements Runnable
 
       while (running)
       {
-        try
-        {
-          ConsumerRecords<String, Long> records =
-            consumer.poll(Duration.ofSeconds(1));
-
-          log.info("{} - Received {} messages", id, records.count());
-          for (ConsumerRecord<String, Long> record : records)
-          {
-            handleRecord(
-              record.topic(),
-              record.partition(),
-              record.offset(),
-              record.key(),
-              record.value());
-          }
-        }
-        catch(RecordDeserializationException e)
-        {
-          log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset());
-          consumer.seek(e.topicPartition(), e.offset() +1);
-        }
+        pollAndHandleRecords();
       }
     }
     catch(WakeupException e)
@@ -93,11 +74,38 @@ public class ExampleConsumer implements Runnable
       log.info("{} - Triggering exit of application!", id);
       new Thread(closeCallback).start();
     }
+  }
+
+  private void pollAndHandleRecords()
+  {
+    lock.lock();
+    try
+    {
+      ConsumerRecords<String, Long> records = consumer.poll(Duration.ofSeconds(1));
+
+      log.info("{} - Received {} messages", id, records.count());
+      for (ConsumerRecord<String, Long> record : records)
+      {
+        handleRecord(
+          record.topic(),
+          record.partition(),
+          record.offset(),
+          record.key(),
+          record.value());
+      }
+    }
+    catch(RecordDeserializationException e)
+    {
+      log.error("{} - Deserialization Exception {}:{}", id, e.topicPartition(), e.offset());
+      consumer.seek(e.topicPartition(), e.offset() +1);
+    }
+    catch(Exception e)
+    {
+      running = false;
+    }
     finally
     {
-      log.info("{} - Closing the KafkaConsumer", id);
-      consumer.close();
-      log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+      lock.unlock();
     }
   }
 
@@ -114,11 +122,65 @@ public class ExampleConsumer implements Runnable
   }
 
 
-  public void shutdown() throws InterruptedException
+  public boolean isRunning()
+  {
+    return running;
+  }
+
+  public void start()
   {
-    log.info("{} joining the worker-thread...", id);
-    running = false;
-    consumer.wakeup();
+    lock.lock();
+    try
+    {
+      if (workerThread != null)
+      {
+        throw new RuntimeException(id
+          + " - Worker thread is already running: "
+          + workerThread.toString());
+      }
+
+      log.info("{} starting the worker-thread...", id);
+      workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
+      workerThread.start();
+    }
+    finally
+    {
+      lock.unlock();
+    }
+  }
+
+  public void stop() throws InterruptedException
+  {
+    lock.lock();
+    try
+    {
+      if (workerThread == null)
+      {
+        throw new RuntimeException(id
+          + " - Worker thread is already running: "
+          + workerThread.toString());
+      }
+
+      if (running)
+      {
+        log.info("{} - Signaling the KafkaConsumer to finish its work", id);
+        consumer.wakeup();
+      }
+    }
+    finally
+    {
+      lock.unlock();
+    }
+
+    log.info("{} - Joining the worker-thread...", id);
     workerThread.join();
+    workerThread = null;
+  }
+
+  public void shutdown() throws InterruptedException
+  {
+    log.info("{} - Closing the KafkaConsumer", id);
+    consumer.close();
+    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
   }
 }