From: Kai Moritz Date: Sat, 23 Jul 2022 13:41:57 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener') X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~4 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=ccf11628d5a1524d4bffe2f1b21b51ad713f1a67;hp=-c;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('rebalance-listener') * Dabei auch die letzten Verbesserungen aus 'rebalance-listener' übernommen. --- ccf11628d5a1524d4bffe2f1b21b51ad713f1a67 diff --combined docker-compose.yml index 5723fc7,1b067cd..e30a7bb --- a/docker-compose.yml +++ b/docker-compose.yml @@@ -1,14 -1,14 +1,14 @@@ version: '3.2' services: zookeeper: - image: confluentinc/cp-zookeeper:7.0.2 + image: confluentinc/cp-zookeeper:7.1.3 environment: ZOOKEEPER_CLIENT_PORT: 2181 ports: - 2181:2181 kafka: - image: confluentinc/cp-kafka:7.0.2 + image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 @@@ -24,23 -24,6 +24,23 @@@ depends_on: - zookeeper + mongo: + image: mongo:4.4 + ports: + - 27017:27017 + environment: + MONGO_INITDB_ROOT_USERNAME: juplo + MONGO_INITDB_ROOT_PASSWORD: training + + express: + image: mongo-express + ports: + - 8090:8081 + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: juplo + ME_CONFIG_MONGODB_ADMINPASSWORD: training + ME_CONFIG_MONGODB_URL: mongodb://juplo:training@mongo:27017/ + kafka-ui: image: provectuslabs/kafka-ui:0.3.3 ports: @@@ -56,8 -39,9 +56,9 @@@ producer: image: juplo/endless-producer:1.0-SNAPSHOT ports: - - 8000:8080 + - 8080:8080 environment: + server.port: 8080 producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test @@@ -67,10 -51,10 +68,11 @@@ consumer: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - - 8081:8081 + - 8081:8080 environment: + server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group consumer.client-id: consumer consumer.topic: test + spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017 + spring.data.mongodb.database: juplo diff --combined pom.xml index 78b2fde,9db9d9d..0fbe7e6 --- a/pom.xml +++ b/pom.xml @@@ -7,7 -7,7 +7,7 @@@ org.springframework.boot spring-boot-starter-parent - 2.6.5 + 2.7.2 @@@ -21,10 -21,10 +21,14 @@@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-data-mongodb + + + org.springframework.boot + spring-boot-starter-validation + org.springframework.boot spring-boot-starter-actuator @@@ -54,6 -54,17 +58,17 @@@ org.springframework.boot spring-boot-maven-plugin + + + + build-info + + + + + + pl.project13.maven + git-commit-id-plugin io.fabric8 diff --combined src/main/java/de/juplo/kafka/Application.java index bcbf418,de4b66d..2f6e4f2 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@@ -5,7 -5,6 +5,6 @@@ import org.springframework.boot.SpringA import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; - import org.springframework.util.Assert; import java.util.concurrent.Executors; @@@ -19,17 -18,11 +18,12 @@@ public class Applicatio @Bean - public EndlessConsumer consumer() + public EndlessConsumer consumer(PartitionStatisticsRepository repository) { - Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); - Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); - Assert.hasText(properties.getClientId(), "consumer.client-id must be set"); - Assert.hasText(properties.getTopic(), "consumer.topic must be set"); - EndlessConsumer consumer = new EndlessConsumer( Executors.newFixedThreadPool(1), + repository, properties.getBootstrapServer(), properties.getGroupId(), properties.getClientId(), diff --combined src/main/java/de/juplo/kafka/EndlessConsumer.java index 7cb77aa,c2d4447..e5ef7d0 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -14,32 -14,34 +14,36 @@@ import java.time.Duration import java.util.*; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; - import java.util.concurrent.Future; - import java.util.concurrent.atomic.AtomicBoolean; + import java.util.concurrent.locks.Condition; + import java.util.concurrent.locks.Lock; + import java.util.concurrent.locks.ReentrantLock; @Slf4j public class EndlessConsumer implements Runnable { private final ExecutorService executor; + private final PartitionStatisticsRepository repository; private final String bootstrapServer; private final String groupId; private final String id; private final String topic; private final String autoOffsetReset; - private AtomicBoolean running = new AtomicBoolean(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private boolean running = false; + private Exception exception; private long consumed = 0; private KafkaConsumer consumer = null; - private Future future = null; + private final Map> seen = new HashMap<>(); public EndlessConsumer( ExecutorService executor, + PartitionStatisticsRepository repository, String bootstrapServer, String groupId, String clientId, @@@ -47,7 -49,6 +51,7 @@@ String autoOffsetReset) { this.executor = executor; + this.repository = repository; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; @@@ -90,7 -91,6 +94,7 @@@ tp.partition(), key); } + repository.save(new StatisticsDocument(tp.partition(), removed)); }); } @@@ -100,12 -100,7 +104,12 @@@ partitions.forEach(tp -> { log.info("{} - adding partition: {}", id, tp); - seen.put(tp.partition(), new HashMap<>()); + seen.put( + tp.partition(), + repository + .findById(Integer.toString(tp.partition())) + .map(document -> document.statistics) + .orElse(new HashMap<>())); }); } }); @@@ -146,11 -141,12 +150,12 @@@ catch(WakeupException e) { log.info("{} - RIIING!", id); + shutdown(); } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); - running.set(false); // Mark the instance as not running + shutdown(e); } finally { @@@ -160,31 -156,67 +165,67 @@@ } } + private void shutdown() + { + shutdown(null); + } + + private void shutdown(Exception e) + { + lock.lock(); + try + { + running = false; + exception = e; + condition.signal(); + } + finally + { + lock.unlock(); + } + } + public Map> getSeen() { return seen; } - public synchronized void start() + public void start() { - boolean stateChanged = running.compareAndSet(false, true); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); + lock.lock(); + try + { + if (running) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - log.info("{} - Starting - consumed {} messages before", id, consumed); - future = executor.submit(this); + log.info("{} - Starting - consumed {} messages before", id, consumed); + running = true; + exception = null; + executor.submit(this); + } + finally + { + lock.unlock(); + } } public synchronized void stop() throws ExecutionException, InterruptedException { - boolean stateChanged = running.compareAndSet(true, false); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - future.get(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); + lock.lock(); + try + { + if (!running) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + condition.await(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + finally + { + lock.unlock(); + } } @PreDestroy @@@ -199,9 -231,42 +240,42 @@@ { log.info("{} - Was already stopped", id); } + catch (Exception e) + { + log.error("{} - Unexpected exception while trying to stop the consumer", id, e); + } finally { log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } + + public boolean running() + { + lock.lock(); + try + { + return running; + } + finally + { + lock.unlock(); + } + } + + public Optional exitStatus() + { + lock.lock(); + try + { + if (running) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return Optional.ofNullable(exception); + } + finally + { + lock.unlock(); + } + } } diff --combined src/main/resources/application.yml index 94490a3,9f3cb81..93b27c2 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@@ -1,22 -1,32 +1,37 @@@ consumer: bootstrap-server: :9092 group-id: my-group - client-id: IDE + client-id: DEV topic: test auto-offset-reset: earliest management: + endpoint: + shutdown: + enabled: true endpoints: web: exposure: include: "*" + info: + env: + enabled: true + java: + enabled: true + info: + kafka: + bootstrap-server: ${consumer.bootstrap-server} + client-id: ${consumer.client-id} + group-id: ${consumer.group-id} + topic: ${consumer.topic} + auto-offset-reset: ${consumer.auto-offset-reset} +spring: + data: + mongodb: + uri: mongodb://juplo:training@localhost:27017 + database: juplo logging: level: root: INFO de.juplo: DEBUG server: - port: 8081 + port: 8881