From: Kai Moritz Date: Sat, 23 Jul 2022 11:49:23 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('endless-stream-consumer') X-Git-Tag: wip-DEPRECATED~12^2~1^2^2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f9c0ba7779552d8fcfc9cb29c8b689e20c314904;hp=ecd5cb8483c199e0d50507a967ddcd31cd8b2aa1;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('endless-stream-consumer') * Die letzten Änderungen an 'endless-stream-consumer' sind länger nicht mehr gemerged worden. --- diff --git a/docker-compose.yml b/docker-compose.yml index 0b1f0ae..f9eeedd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,14 @@ version: '3.2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.0.2 + image: confluentinc/cp-zookeeper:7.1.3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 kafka: - image: confluentinc/cp-kafka:7.0.2 + image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 @@ -41,6 +41,7 @@ services: ports: - 8000:8080 environment: + server.port: 8080 producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test @@ -50,8 +51,9 @@ services: consumer: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - - 8081:8081 + - 8081:8080 environment: + server.port: 8080 consumer.bootstrap-server: kafka:9092 consumer.client-id: my-group consumer.client-id: consumer diff --git a/pom.xml b/pom.xml index b7b0b8d..9db9d9d 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.6.5 + 2.7.2 @@ -21,6 +21,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-validation + org.springframework.boot spring-boot-starter-actuator @@ -50,6 +54,17 @@ org.springframework.boot spring-boot-maven-plugin + + + + build-info + + + + + + pl.project13.maven + git-commit-id-plugin io.fabric8 diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index dd4b20a..de4b66d 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; import java.util.concurrent.Executors; @@ -21,11 +20,6 @@ public class Application @Bean public EndlessConsumer consumer() { - Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); - Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); - Assert.hasText(properties.getClientId(), "consumer.client-id must be set"); - Assert.hasText(properties.getTopic(), "consumer.topic must be set"); - EndlessConsumer consumer = new EndlessConsumer( Executors.newFixedThreadPool(1), diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java new file mode 100644 index 0000000..ab9782c --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -0,0 +1,32 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.stereotype.Component; + + +@Component +@RequiredArgsConstructor +public class ApplicationHealthIndicator implements HealthIndicator +{ + private final EndlessConsumer consumer; + + + @Override + public Health health() + { + try + { + return consumer + .exitStatus() + .map(Health::down) + .orElse(Health.outOfService()) + .build(); + } + catch (IllegalStateException e) + { + return Health.up().build(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index dab3380..fa731c5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -3,16 +3,31 @@ package de.juplo.kafka; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; @ConfigurationProperties(prefix = "consumer") +@Validated @Getter @Setter public class ApplicationProperties { + @NotNull + @NotEmpty private String bootstrapServer; + @NotNull + @NotEmpty private String groupId; + @NotNull + @NotEmpty private String clientId; + @NotNull + @NotEmpty private String topic; + @NotNull + @NotEmpty private String autoOffsetReset; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index a504842..06e562c 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -2,7 +2,10 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import java.util.Map; @@ -34,4 +37,12 @@ public class DriverController { return consumer.getSeen(); } + + + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ErrorResponse illegalStateException(IllegalStateException e) + { + return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); + } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 357a0b4..2310ccd 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -12,11 +12,13 @@ import java.time.Duration; import java.util.Arrays; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @@ -29,10 +31,13 @@ public class EndlessConsumer implements Runnable private final String topic; private final String autoOffsetReset; - private AtomicBoolean running = new AtomicBoolean(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private boolean running = false; + private Exception exception; private long consumed = 0; private KafkaConsumer consumer = null; - private Future future = null; + private Map> seen; @@ -114,11 +119,12 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING!", id); + shutdown(); } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); - running.set(false); // Mark the instance as not running + shutdown(e); } finally { @@ -149,26 +155,62 @@ public class EndlessConsumer implements Runnable return seen; } - public synchronized void start() + private void shutdown() + { + shutdown(null); + } + + private void shutdown(Exception e) + { + lock.lock(); + try + { + running = false; + exception = e; + condition.signal(); + } + finally + { + lock.unlock(); + } + } + + public void start() { - boolean stateChanged = running.compareAndSet(false, true); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + lock.lock(); + try + { + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Starting - consumed {} messages before", id, consumed); - future = executor.submit(this); + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + exception = null; + executor.submit(this); + } + finally + { + lock.unlock(); + } } public synchronized void stop() throws ExecutionException, InterruptedException { - boolean stateChanged = running.compareAndSet(true, false); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - future.get(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); + lock.lock(); + try + { + if (!running) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + condition.await(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + finally + { + lock.unlock(); + } } @PreDestroy @@ -183,9 +225,42 @@ public class EndlessConsumer implements Runnable { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + public boolean running() + { + lock.lock(); + try + { + return running; + } + finally + { + lock.unlock(); + } + } + + public Optional exitStatus() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return Optional.ofNullable(exception); + } + finally + { + lock.unlock(); + } + } } diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java new file mode 100644 index 0000000..5ca206d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ErrorResponse.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ErrorResponse +{ + private final String error; + private final Integer status; +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index db37822..9f3cb81 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,17 +1,32 @@ consumer: bootstrap-server: :9092 - group-id: my-consumer - client-id: peter + group-id: my-group + client-id: DEV topic: test auto-offset-reset: earliest management: + endpoint: + shutdown: + enabled: true endpoints: web: exposure: include: "*" + info: + env: + enabled: true + java: + enabled: true +info: + kafka: + bootstrap-server: ${consumer.bootstrap-server} + client-id: ${consumer.client-id} + group-id: ${consumer.group-id} + topic: ${consumer.topic} + auto-offset-reset: ${consumer.auto-offset-reset} logging: level: root: INFO de.juplo: DEBUG server: - port: 8081 + port: 8881