Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener')
authorKai Moritz <kai@juplo.de>
Sat, 23 Jul 2022 13:41:57 +0000 (15:41 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 23 Jul 2022 13:48:48 +0000 (15:48 +0200)
* Dabei auch die letzten Verbesserungen aus 'rebalance-listener' übernommen.

docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/ErrorResponse.java [new file with mode: 0644]
src/main/resources/application.yml

index 5723fc7..e30a7bb 100644 (file)
@@ -1,14 +1,14 @@
 version: '3.2'
 services:
   zookeeper:
-    image: confluentinc/cp-zookeeper:7.0.2
+    image: confluentinc/cp-zookeeper:7.1.3
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181
     ports:
       - 2181:2181
 
   kafka:
-    image: confluentinc/cp-kafka:7.0.2
+    image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
@@ -56,8 +56,9 @@ services:
   producer:
     image: juplo/endless-producer:1.0-SNAPSHOT
     ports:
-      - 8000:8080
+      - 8080:8080
     environment:
+      server.port: 8080
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
@@ -67,8 +68,9 @@ services:
   consumer:
     image: juplo/endless-consumer:1.0-SNAPSHOT
     ports:
-      - 8081:8081
+      - 8081:8080
     environment:
+      server.port: 8080
       consumer.bootstrap-server: kafka:9092
       consumer.client-id: consumer
       consumer.topic: test
diff --git a/pom.xml b/pom.xml
index 78b2fde..0fbe7e6 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
   <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
-    <version>2.6.5</version>
+    <version>2.7.2</version>
     <relativePath/> <!-- lookup parent from repository -->
   </parent>
 
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-mongodb</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-validation</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-actuator</artifactId>
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>build-info</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>pl.project13.maven</groupId>
+        <artifactId>git-commit-id-plugin</artifactId>
       </plugin>
       <plugin>
         <groupId>io.fabric8</groupId>
index bcbf418..2f6e4f2 100644 (file)
@@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
 
 import java.util.concurrent.Executors;
 
@@ -21,11 +20,6 @@ public class Application
   @Bean
   public EndlessConsumer consumer(PartitionStatisticsRepository repository)
   {
-    Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set");
-    Assert.hasText(properties.getGroupId(), "consumer.group-id must be set");
-    Assert.hasText(properties.getClientId(), "consumer.client-id must be set");
-    Assert.hasText(properties.getTopic(), "consumer.topic must be set");
-
     EndlessConsumer consumer =
         new EndlessConsumer(
             Executors.newFixedThreadPool(1),
diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
new file mode 100644 (file)
index 0000000..ab9782c
--- /dev/null
@@ -0,0 +1,32 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.HealthIndicator;
+import org.springframework.stereotype.Component;
+
+
+@Component
+@RequiredArgsConstructor
+public class ApplicationHealthIndicator implements HealthIndicator
+{
+  private final EndlessConsumer consumer;
+
+
+  @Override
+  public Health health()
+  {
+    try
+    {
+      return consumer
+          .exitStatus()
+          .map(Health::down)
+          .orElse(Health.outOfService())
+          .build();
+    }
+    catch (IllegalStateException e)
+    {
+      return Health.up().build();
+    }
+  }
+}
index dab3380..fa731c5 100644 (file)
@@ -3,16 +3,31 @@ package de.juplo.kafka;
 import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
 
 
 @ConfigurationProperties(prefix = "consumer")
+@Validated
 @Getter
 @Setter
 public class ApplicationProperties
 {
+  @NotNull
+  @NotEmpty
   private String bootstrapServer;
+  @NotNull
+  @NotEmpty
   private String groupId;
+  @NotNull
+  @NotEmpty
   private String clientId;
+  @NotNull
+  @NotEmpty
   private String topic;
+  @NotNull
+  @NotEmpty
   private String autoOffsetReset;
 }
index a504842..1fb2a1b 100644 (file)
@@ -1,8 +1,11 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Map;
@@ -34,4 +37,12 @@ public class DriverController
   {
     return consumer.getSeen();
   }
+
+
+  @ExceptionHandler
+  @ResponseStatus(HttpStatus.BAD_REQUEST)
+  public ErrorResponse illegalStateException(IllegalStateException e)
+  {
+    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+  }
 }
index 7cb77aa..e5ef7d0 100644 (file)
@@ -14,8 +14,9 @@ import java.time.Duration;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 @Slf4j
@@ -29,10 +30,13 @@ public class EndlessConsumer implements Runnable
   private final String topic;
   private final String autoOffsetReset;
 
-  private AtomicBoolean running = new AtomicBoolean();
+  private final Lock lock = new ReentrantLock();
+  private final Condition condition = lock.newCondition();
+  private boolean running = false;
+  private Exception exception;
   private long consumed = 0;
   private KafkaConsumer<String, String> consumer = null;
-  private Future<?> future = null;
+
 
   private final Map<Integer, Map<String, Integer>> seen = new HashMap<>();
 
@@ -146,11 +150,12 @@ public class EndlessConsumer implements Runnable
     catch(WakeupException e)
     {
       log.info("{} - RIIING!", id);
+      shutdown();
     }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}", id, e.toString(), e);
-      running.set(false); // Mark the instance as not running
+      shutdown(e);
     }
     finally
     {
@@ -160,31 +165,67 @@ public class EndlessConsumer implements Runnable
     }
   }
 
+  private void shutdown()
+  {
+    shutdown(null);
+  }
+
+  private void shutdown(Exception e)
+  {
+    lock.lock();
+    try
+    {
+      running = false;
+      exception = e;
+      condition.signal();
+    }
+    finally
+    {
+      lock.unlock();
+    }
+  }
+
   public Map<Integer, Map<String, Integer>> getSeen()
   {
     return seen;
   }
 
-  public synchronized void start()
+  public void start()
   {
-    boolean stateChanged = running.compareAndSet(false, true);
-    if (!stateChanged)
-      throw new RuntimeException("Consumer instance " + id + " is already running!");
+    lock.lock();
+    try
+    {
+      if (running)
+        throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
-    log.info("{} - Starting - consumed {} messages before", id, consumed);
-    future = executor.submit(this);
+      log.info("{} - Starting - consumed {} messages before", id, consumed);
+      running = true;
+      exception = null;
+      executor.submit(this);
+    }
+    finally
+    {
+      lock.unlock();
+    }
   }
 
   public synchronized void stop() throws ExecutionException, InterruptedException
   {
-    boolean stateChanged = running.compareAndSet(true, false);
-    if (!stateChanged)
-      throw new RuntimeException("Consumer instance " + id + " is not running!");
-
-    log.info("{} - Stopping", id);
-    consumer.wakeup();
-    future.get();
-    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+    lock.lock();
+    try
+    {
+      if (!running)
+        throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+      log.info("{} - Stopping", id);
+      consumer.wakeup();
+      condition.await();
+      log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+    }
+    finally
+    {
+      lock.unlock();
+    }
   }
 
   @PreDestroy
@@ -199,9 +240,42 @@ public class EndlessConsumer implements Runnable
     {
       log.info("{} - Was already stopped", id);
     }
+    catch (Exception e)
+    {
+      log.error("{} - Unexpected exception while trying to stop the consumer", id, e);
+    }
     finally
     {
       log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
     }
   }
+
+  public boolean running()
+  {
+    lock.lock();
+    try
+    {
+      return running;
+    }
+    finally
+    {
+      lock.unlock();
+    }
+  }
+
+  public Optional<Exception> exitStatus()
+  {
+    lock.lock();
+    try
+    {
+      if (running)
+        throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+      return Optional.ofNullable(exception);
+    }
+    finally
+    {
+      lock.unlock();
+    }
+  }
 }
diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java
new file mode 100644 (file)
index 0000000..5ca206d
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ErrorResponse
+{
+  private final String error;
+  private final Integer status;
+}
index 94490a3..93b27c2 100644 (file)
@@ -1,14 +1,29 @@
 consumer:
   bootstrap-server: :9092
   group-id: my-group
-  client-id: IDE
+  client-id: DEV
   topic: test
   auto-offset-reset: earliest
 management:
+  endpoint:
+    shutdown:
+      enabled: true
   endpoints:
     web:
       exposure:
         include: "*"
+  info:
+    env:
+      enabled: true
+    java:
+      enabled: true
+info:
+  kafka:
+    bootstrap-server: ${consumer.bootstrap-server}
+    client-id: ${consumer.client-id}
+    group-id: ${consumer.group-id}
+    topic: ${consumer.topic}
+    auto-offset-reset: ${consumer.auto-offset-reset}
 spring:
   data:
     mongodb:
@@ -19,4 +34,4 @@ logging:
     root: INFO
     de.juplo: DEBUG
 server:
-  port: 8081
+  port: 8881