Merge branch 'endless-stream-consumer' into rebalance-listener
authorKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 17:03:38 +0000 (19:03 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 10 Apr 2022 17:03:38 +0000 (19:03 +0200)
1  2 
pom.xml
src/main/java/de/juplo/kafka/EndlessConsumer.java

diff --combined pom.xml
+++ b/pom.xml
    <groupId>de.juplo.kafka</groupId>
    <artifactId>endless-consumer</artifactId>
    <version>1.0-SNAPSHOT</version>
 -  <name>Endless Consumer: a Simple Consumer-Group that reads and print the topic</name>
 +  <name>Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic</name>
  
    <dependencies>
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
      </dependency>
+     <dependency>
+       <groupId>org.springframework.boot</groupId>
+       <artifactId>spring-boot-starter-validation</artifactId>
+     </dependency>
      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
@@@ -1,21 -1,21 +1,22 @@@
  package de.juplo.kafka;
  
  import lombok.extern.slf4j.Slf4j;
 +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
  import org.apache.kafka.clients.consumer.ConsumerRecord;
  import org.apache.kafka.clients.consumer.ConsumerRecords;
  import org.apache.kafka.clients.consumer.KafkaConsumer;
 +import org.apache.kafka.common.TopicPartition;
  import org.apache.kafka.common.errors.WakeupException;
  import org.apache.kafka.common.serialization.StringDeserializer;
  
  import javax.annotation.PreDestroy;
  import java.time.Duration;
 -import java.util.Arrays;
 -import java.util.Properties;
 +import java.util.*;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Future;
- import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.locks.Condition;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantLock;
  
  
  @Slf4j
@@@ -28,14 -28,13 +29,16 @@@ public class EndlessConsumer implement
    private final String topic;
    private final String autoOffsetReset;
  
-   private AtomicBoolean running = new AtomicBoolean();
+   private final Lock lock = new ReentrantLock();
+   private final Condition condition = lock.newCondition();
+   private boolean running = false;
    private long consumed = 0;
    private KafkaConsumer<String, String> consumer = null;
-   private Future<?> future = null;
  
 +  private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
 +
 +
    public EndlessConsumer(
        ExecutorService executor,
        String bootstrapServer,
        props.put("group.id", groupId);
        props.put("client.id", id);
        props.put("auto.offset.reset", autoOffsetReset);
 +      props.put("metadata.max.age.ms", "1000");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
  
        this.consumer = new KafkaConsumer<>(props);
  
        log.info("{} - Subscribing to topic {}", id, topic);
 -      consumer.subscribe(Arrays.asList(topic));
 +      consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener()
 +      {
 +        @Override
 +        public void onPartitionsRevoked(Collection<TopicPartition> partitions)
 +        {
 +          partitions.forEach(tp ->
 +          {
 +            log.info("{} - removing partition: {}", id, tp);
 +            Map<String, Integer> removed = seen.remove(tp.partition());
 +            for (String key : removed.keySet())
 +            {
 +              log.info(
 +                  "{} - Seen {} messages for partition={}|key={}",
 +                  id,
 +                  removed.get(key),
 +                  tp.partition(),
 +                  key);
 +            }
 +          });
 +        }
 +
 +        @Override
 +        public void onPartitionsAssigned(Collection<TopicPartition> partitions)
 +        {
 +          partitions.forEach(tp ->
 +          {
 +            log.info("{} - adding partition: {}", id, tp);
 +            seen.put(tp.partition(), new HashMap<>());
 +          });
 +        }
 +      });
  
        while (true)
        {
                record.key(),
                record.value()
            );
 +
 +          Integer partition = record.partition();
 +          String key = record.key() == null ? "NULL" : record.key();
 +          Map<String, Integer> byKey = seen.get(partition);
 +
 +          if (!byKey.containsKey(key))
 +            byKey.put(key, 0);
 +
 +          int seenByKey = byKey.get(key);
 +          seenByKey++;
 +          byKey.put(key, seenByKey);
          }
        }
      }
      catch(WakeupException e)
      {
        log.info("{} - RIIING!", id);
+       shutdown();
      }
      catch(Exception e)
      {
        log.error("{} - Unexpected error: {}", id, e.toString(), e);
-       running.set(false); // Mark the instance as not running
+       shutdown();
      }
      finally
      {
      }
    }
  
-   public synchronized void start()
+   private void shutdown()
+   {
+     lock.lock();
+     try
+     {
+       running = false;
+       condition.signal();
+     }
+     finally
+     {
+       lock.unlock();
+     }
+   }
 +  public Map<Integer, Map<String, Integer>> getSeen()
 +  {
 +    return seen;
 +  }
 +
+   public void start()
    {
-     boolean stateChanged = running.compareAndSet(false, true);
-     if (!stateChanged)
-       throw new RuntimeException("Consumer instance " + id + " is already running!");
+     lock.lock();
+     try
+     {
+       if (running)
+         throw new IllegalStateException("Consumer instance " + id + " is already running!");
  
-     log.info("{} - Starting - consumed {} messages before", id, consumed);
-     future = executor.submit(this);
+       log.info("{} - Starting - consumed {} messages before", id, consumed);
+       running = true;
+       executor.submit(this);
+     }
+     finally
+     {
+       lock.unlock();
+     }
    }
  
    public synchronized void stop() throws ExecutionException, InterruptedException
    {
-     boolean stateChanged = running.compareAndSet(true, false);
-     if (!stateChanged)
-       throw new RuntimeException("Consumer instance " + id + " is not running!");
-     log.info("{} - Stopping", id);
-     consumer.wakeup();
-     future.get();
-     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+     lock.lock();
+     try
+     {
+       if (!running)
+         throw new IllegalStateException("Consumer instance " + id + " is not running!");
+       log.info("{} - Stopping", id);
+       consumer.wakeup();
+       condition.await();
+       log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+     }
+     finally
+     {
+       lock.unlock();
+     }
    }
  
    @PreDestroy
      {
        log.info("{} - Was already stopped", id);
      }
+     catch (Exception e)
+     {
+       log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+     }
      finally
      {
        log.info("{}: Consumed {} messages in total, exiting!", id, consumed);