From: Kai Moritz Date: Sat, 23 Jul 2022 13:41:57 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener') X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ccf11628d5a1524d4bffe2f1b21b51ad713f1a67;hp=97d44c756b032c0067484f2cf3f171ddba43cf05;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener') * Dabei auch die letzten Verbesserungen aus 'rebalance-listener' übernommen. --- diff --git a/docker-compose.yml b/docker-compose.yml index 5723fc7..e30a7bb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,14 +1,14 @@ version: '3.2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.0.2 + image: confluentinc/cp-zookeeper:7.1.3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 kafka: - image: confluentinc/cp-kafka:7.0.2 + image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 @@ -56,8 +56,9 @@ services: producer: image: juplo/endless-producer:1.0-SNAPSHOT ports: - - 8000:8080 + - 8080:8080 environment: + server.port: 8080 producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test @@ -67,8 +68,9 @@ services: consumer: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - - 8081:8081 + - 8081:8080 environment: + server.port: 8080 consumer.bootstrap-server: kafka:9092 consumer.client-id: consumer consumer.topic: test diff --git a/pom.xml b/pom.xml index 78b2fde..0fbe7e6 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ org.springframework.boot spring-boot-starter-parent - 2.6.5 + 2.7.2 @@ -25,6 +25,10 @@ org.springframework.boot spring-boot-starter-data-mongodb + + org.springframework.boot + spring-boot-starter-validation + org.springframework.boot spring-boot-starter-actuator @@ -54,6 +58,17 @@ org.springframework.boot spring-boot-maven-plugin + + + + build-info + + + + + + pl.project13.maven + git-commit-id-plugin io.fabric8 diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bcbf418..2f6e4f2 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -5,7 +5,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; import java.util.concurrent.Executors; @@ -21,11 +20,6 @@ public class Application @Bean public EndlessConsumer consumer(PartitionStatisticsRepository repository) { - Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); - Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); - Assert.hasText(properties.getClientId(), "consumer.client-id must be set"); - Assert.hasText(properties.getTopic(), "consumer.topic must be set"); - EndlessConsumer consumer = new EndlessConsumer( Executors.newFixedThreadPool(1), diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java new file mode 100644 index 0000000..ab9782c --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -0,0 +1,32 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.stereotype.Component; + + +@Component +@RequiredArgsConstructor +public class ApplicationHealthIndicator implements HealthIndicator +{ + private final EndlessConsumer consumer; + + + @Override + public Health health() + { + try + { + return consumer + .exitStatus() + .map(Health::down) + .orElse(Health.outOfService()) + .build(); + } + catch (IllegalStateException e) + { + return Health.up().build(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index dab3380..fa731c5 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -3,16 +3,31 @@ package de.juplo.kafka; import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +import javax.validation.constraints.NotEmpty; +import javax.validation.constraints.NotNull; @ConfigurationProperties(prefix = "consumer") +@Validated @Getter @Setter public class ApplicationProperties { + @NotNull + @NotEmpty private String bootstrapServer; + @NotNull + @NotEmpty private String groupId; + @NotNull + @NotEmpty private String clientId; + @NotNull + @NotEmpty private String topic; + @NotNull + @NotEmpty private String autoOffsetReset; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index a504842..1fb2a1b 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,8 +1,11 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import java.util.Map; @@ -34,4 +37,12 @@ public class DriverController { return consumer.getSeen(); } + + + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ErrorResponse illegalStateException(IllegalStateException e) + { + return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); + } } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 7cb77aa..e5ef7d0 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -14,8 +14,9 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; @Slf4j @@ -29,10 +30,13 @@ public class EndlessConsumer implements Runnable private final String topic; private final String autoOffsetReset; - private AtomicBoolean running = new AtomicBoolean(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private boolean running = false; + private Exception exception; private long consumed = 0; private KafkaConsumer consumer = null; - private Future future = null; + private final Map> seen = new HashMap<>(); @@ -146,11 +150,12 @@ public class EndlessConsumer implements Runnable catch(WakeupException e) { log.info("{} - RIIING!", id); + shutdown(); } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); - running.set(false); // Mark the instance as not running + shutdown(e); } finally { @@ -160,31 +165,67 @@ public class EndlessConsumer implements Runnable } } + private void shutdown() + { + shutdown(null); + } + + private void shutdown(Exception e) + { + lock.lock(); + try + { + running = false; + exception = e; + condition.signal(); + } + finally + { + lock.unlock(); + } + } + public Map> getSeen() { return seen; } - public synchronized void start() + public void start() { - boolean stateChanged = running.compareAndSet(false, true); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + lock.lock(); + try + { + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Starting - consumed {} messages before", id, consumed); - future = executor.submit(this); + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + exception = null; + executor.submit(this); + } + finally + { + lock.unlock(); + } } public synchronized void stop() throws ExecutionException, InterruptedException { - boolean stateChanged = running.compareAndSet(true, false); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - future.get(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); + lock.lock(); + try + { + if (!running) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + condition.await(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + finally + { + lock.unlock(); + } } @PreDestroy @@ -199,9 +240,42 @@ public class EndlessConsumer implements Runnable { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + public boolean running() + { + lock.lock(); + try + { + return running; + } + finally + { + lock.unlock(); + } + } + + public Optional exitStatus() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return Optional.ofNullable(exception); + } + finally + { + lock.unlock(); + } + } } diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java new file mode 100644 index 0000000..5ca206d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ErrorResponse.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ErrorResponse +{ + private final String error; + private final Integer status; +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 94490a3..93b27c2 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,14 +1,29 @@ consumer: bootstrap-server: :9092 group-id: my-group - client-id: IDE + client-id: DEV topic: test auto-offset-reset: earliest management: + endpoint: + shutdown: + enabled: true endpoints: web: exposure: include: "*" + info: + env: + enabled: true + java: + enabled: true +info: + kafka: + bootstrap-server: ${consumer.bootstrap-server} + client-id: ${consumer.client-id} + group-id: ${consumer.group-id} + topic: ${consumer.topic} + auto-offset-reset: ${consumer.auto-offset-reset} spring: data: mongodb: @@ -19,4 +34,4 @@ logging: root: INFO de.juplo: DEBUG server: - port: 8081 + port: 8881