Merge der überarbeiteten Compose-Konfiguration ('endless-stream-consumer')
authorKai Moritz <kai@juplo.de>
Sat, 23 Jul 2022 11:49:23 +0000 (13:49 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 23 Jul 2022 11:49:23 +0000 (13:49 +0200)
* Die letzten Änderungen an 'endless-stream-consumer' sind länger nicht
  mehr gemerged worden.

1  2 
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

@@@ -39,8 -39,9 +39,9 @@@ services
    producer:
      image: juplo/endless-producer:1.0-SNAPSHOT
      ports:
 -      - 8080:8080
 +      - 8000:8080
      environment:
+       server.port: 8080
        producer.bootstrap-server: kafka:9092
        producer.client-id: producer
        producer.topic: test
diff --cc pom.xml
Simple merge
@@@ -1,11 -1,12 +1,14 @@@
  package de.juplo.kafka;
  
  import lombok.RequiredArgsConstructor;
 +import org.springframework.web.bind.annotation.GetMapping;
+ import org.springframework.http.HttpStatus;
+ import org.springframework.web.bind.annotation.ExceptionHandler;
  import org.springframework.web.bind.annotation.PostMapping;
+ import org.springframework.web.bind.annotation.ResponseStatus;
  import org.springframework.web.bind.annotation.RestController;
  
 +import java.util.Map;
  import java.util.concurrent.ExecutionException;
  
  
@@@ -28,10 -29,10 +31,18 @@@ public class DriverControlle
      consumer.stop();
    }
  
 +
 +  @GetMapping("seen")
 +  public Map<Integer, Map<String, Integer>> seen()
 +  {
 +    return consumer.getSeen();
 +  }
++
++
+   @ExceptionHandler
+   @ResponseStatus(HttpStatus.BAD_REQUEST)
+   public ErrorResponse illegalStateException(IllegalStateException e)
+   {
+     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+   }
  }
@@@ -10,8 -10,7 +10,9 @@@ import org.apache.kafka.common.serializ
  import javax.annotation.PreDestroy;
  import java.time.Duration;
  import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.Map;
+ import java.util.Optional;
  import java.util.Properties;
  import java.util.concurrent.ExecutionException;
  import java.util.concurrent.ExecutorService;
@@@ -29,14 -29,14 +31,17 @@@ public class EndlessConsumer implement
    private final String topic;
    private final String autoOffsetReset;
  
-   private AtomicBoolean running = new AtomicBoolean();
+   private final Lock lock = new ReentrantLock();
+   private final Condition condition = lock.newCondition();
+   private boolean running = false;
+   private Exception exception;
    private long consumed = 0;
    private KafkaConsumer<String, String> consumer = null;
-   private Future<?> future = null;
  
 +  private Map<Integer, Map<String, Integer>> seen;
 +
 +
    public EndlessConsumer(
        ExecutorService executor,
        String bootstrapServer,
      }
    }
  
-   public synchronized void start()
 +  public Map<Integer, Map<String, Integer>> getSeen()
 +  {
 +    return seen;
 +  }
 +
+   private void shutdown()
+   {
+     shutdown(null);
+   }
+   private void shutdown(Exception e)
+   {
+     lock.lock();
+     try
+     {
+       running = false;
+       exception = e;
+       condition.signal();
+     }
+     finally
+     {
+       lock.unlock();
+     }
+   }
+   public void start()
    {
-     boolean stateChanged = running.compareAndSet(false, true);
-     if (!stateChanged)
-       throw new RuntimeException("Consumer instance " + id + " is already running!");
+     lock.lock();
+     try
+     {
+       if (running)
+         throw new IllegalStateException("Consumer instance " + id + " is already running!");
  
-     log.info("{} - Starting - consumed {} messages before", id, consumed);
-     future = executor.submit(this);
+       log.info("{} - Starting - consumed {} messages before", id, consumed);
+       running = true;
+       exception = null;
+       executor.submit(this);
+     }
+     finally
+     {
+       lock.unlock();
+     }
    }
  
    public synchronized void stop() throws ExecutionException, InterruptedException