Merge branch 'endless-stream-consumer' into rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 17:03:38 +0000 (19:03 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 17:03:38 +0000 (19:03 +0200)
1  2 
pom.xml
src/main/java/de/juplo/kafka/EndlessConsumer.java

diff --cc pom.xml
Simple merge
@@@ -11,11 -9,13 +11,12 @@@ import org.apache.kafka.common.serializ
  
  import javax.annotation.PreDestroy;
  import java.time.Duration;
 -import java.util.Arrays;
 -import java.util.Properties;
 +import java.util.*;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Future;
- import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.locks.Condition;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
  
  
  @Slf4j
@@@ -28,14 -28,13 +29,16 @@@ public class EndlessConsumer implement
    private final String topic;
    private final String autoOffsetReset;
  
-   private AtomicBoolean running = new AtomicBoolean();
+   private final Lock lock = new ReentrantLock();
+   private final Condition condition = lock.newCondition();
+   private boolean running = false;
    private long consumed = 0;
    private KafkaConsumer<String, String> consumer = null;
-   private Future<?> future = null;
  
 +  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
 +
 +
    public EndlessConsumer(
        ExecutorService executor,
        String bootstrapServer,
      }
    }
  
-   public synchronized void start()
+   private void shutdown()
+   {
+     lock.lock();
+     try
+     {
+       running = false;
+       condition.signal();
+     }
+     finally
+     {
+       lock.unlock();
+     }
+   }
 +  public Map<Integer, Map<String, Integer>> getSeen()
 +  {
 +    return seen;
 +  }
 +
+   public void start()
    {
-     boolean stateChanged = running.compareAndSet(false, true);
-     if (!stateChanged)
-       throw new RuntimeException("Consumer instance " + id + " is already running!");
+     lock.lock();
+     try
+     {
+       if (running)
+         throw new IllegalStateException("Consumer instance " + id + " is already running!");
  
-     log.info("{} - Starting - consumed {} messages before", id, consumed);
-     future = executor.submit(this);
+       log.info("{} - Starting - consumed {} messages before", id, consumed);
+       running = true;
+       executor.submit(this);
+     }
+     finally
+     {
+       lock.unlock();
+     }
    }
  
    public synchronized void stop() throws ExecutionException, InterruptedException