From bb4f45a199dd0359d68f65e25ed809b64261f665 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 1 Apr 2022 16:18:11 +0200 Subject: [PATCH] =?utf8?q?Setup=20f=C3=BCr=20die=20=C3=9Cbung=20"Scaling"?= =?utf8?q?=20aus=20der=20DevOps-Schulung=20bei=20Signal=20Iduna?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Siehe tag `signal-iduna-2021-11-devops` im Schulungs-Repo --- .dockerignore | 2 - .maven-dockerexclude | 1 - .maven-dockerinclude | 1 - Dockerfile | 5 - README.sh | 33 ++-- docker-compose.yml | 43 ++++- pom.xml | 78 --------- src/main/java/de/juplo/kafka/Application.java | 47 ------ .../de/juplo/kafka/ApplicationProperties.java | 18 --- .../java/de/juplo/kafka/DriverController.java | 28 ---- .../java/de/juplo/kafka/EndlessConsumer.java | 148 ------------------ src/main/resources/application.yml | 17 -- src/main/resources/logback.xml | 17 -- 13 files changed, 53 insertions(+), 385 deletions(-) delete mode 100644 .dockerignore delete mode 100644 .maven-dockerexclude delete mode 100644 .maven-dockerinclude delete mode 100644 Dockerfile delete mode 100644 pom.xml delete mode 100644 src/main/java/de/juplo/kafka/Application.java delete mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java delete mode 100644 src/main/java/de/juplo/kafka/DriverController.java delete mode 100644 src/main/java/de/juplo/kafka/EndlessConsumer.java delete mode 100644 src/main/resources/application.yml delete mode 100644 src/main/resources/logback.xml diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 1ad9963..0000000 --- a/.dockerignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude deleted file mode 100644 index 72e8ffc..0000000 --- a/.maven-dockerexclude +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/.maven-dockerinclude b/.maven-dockerinclude deleted file mode 100644 index fd6cecd..0000000 --- a/.maven-dockerinclude +++ /dev/null @@ -1 +0,0 @@ -target/*.jar diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 16ee25e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [] diff --git a/README.sh b/README.sh index 900270a..fbeb0f1 100755 --- a/README.sh +++ b/README.sh @@ -1,31 +1,28 @@ #!/bin/bash -IMAGE=juplo/endless-consumer:1.0-SNAPSHOT - if [ "$1" = "cleanup" ] then docker-compose down -v - mvn clean exit fi docker-compose up -d zookeeper kafka cli -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer consumer -sleep 15 -docker-compose stop producer consumer -docker-compose logs consumer + +docker-compose up -d producer +docker-compose up -d peter +sleep 3 +docker-compose up -d franz +sleep 3 +docker-compose up -d beate +sleep 3 +docker-compose up -d klaus +sleep 2 +docker-compose stop franz +sleep 2 +docker-compose kill -s 9 peter +sleep 5 +docker-compose stop producer peter franz beate klaus diff --git a/docker-compose.yml b/docker-compose.yml index bed1fc1..4962312 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -44,15 +44,48 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + producer.throttle-ms: 100 - consumer: + peter: image: juplo/endless-consumer:1.0-SNAPSHOT ports: - - 8081:8081 + - 8081:8080 environment: + server.port: 8080 consumer.bootstrap-server: kafka:9092 - consumer.client-id: my-group - consumer.client-id: consumer + consumer.group-id: my-group + consumer.client-id: peter + consumer.topic: test + + beate: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8082:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: beate + consumer.topic: test + + franz: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8083:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: franz + consumer.topic: test + + klaus: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8084:8080 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.group-id: my-group + consumer.client-id: klaus consumer.topic: test diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 54bb695..0000000 --- a/pom.xml +++ /dev/null @@ -1,78 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.6.5 - - - - de.juplo.kafka - endless-consumer - 1.0-SNAPSHOT - Endless Consumer: a Simple Consumer-Group that reads and print the topic - - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-configuration-processor - true - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - org.springframework.boot - spring-boot-starter-test - test - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - - - diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java deleted file mode 100644 index dd4b20a..0000000 --- a/src/main/java/de/juplo/kafka/Application.java +++ /dev/null @@ -1,47 +0,0 @@ -package de.juplo.kafka; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.concurrent.Executors; - - -@SpringBootApplication -@EnableConfigurationProperties(ApplicationProperties.class) -public class Application -{ - @Autowired - ApplicationProperties properties; - - - @Bean - public EndlessConsumer consumer() - { - Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); - Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); - Assert.hasText(properties.getClientId(), "consumer.client-id must be set"); - Assert.hasText(properties.getTopic(), "consumer.topic must be set"); - - EndlessConsumer consumer = - new EndlessConsumer( - Executors.newFixedThreadPool(1), - properties.getBootstrapServer(), - properties.getGroupId(), - properties.getClientId(), - properties.getTopic(), - properties.getAutoOffsetReset()); - - consumer.start(); - - return consumer; - } - - public static void main(String[] args) - { - SpringApplication.run(Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java deleted file mode 100644 index dab3380..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties(prefix = "consumer") -@Getter -@Setter -public class ApplicationProperties -{ - private String bootstrapServer; - private String groupId; - private String clientId; - private String topic; - private String autoOffsetReset; -} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java deleted file mode 100644 index d8068e5..0000000 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ /dev/null @@ -1,28 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.concurrent.ExecutionException; - - -@RestController -@RequiredArgsConstructor -public class DriverController -{ - private final EndlessConsumer consumer; - - - @PostMapping("start") - public void start() - { - consumer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - consumer.stop(); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java deleted file mode 100644 index b3dd446..0000000 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ /dev/null @@ -1,148 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; - - -@Slf4j -public class EndlessConsumer implements Runnable -{ - private final ExecutorService executor; - private final String bootstrapServer; - private final String groupId; - private final String id; - private final String topic; - private final String autoOffsetReset; - - private AtomicBoolean running = new AtomicBoolean(); - private long consumed = 0; - private KafkaConsumer consumer = null; - private Future future = null; - - public EndlessConsumer( - ExecutorService executor, - String bootstrapServer, - String groupId, - String clientId, - String topic, - String autoOffsetReset) - { - this.executor = executor; - this.bootstrapServer = bootstrapServer; - this.groupId = groupId; - this.id = clientId; - this.topic = topic; - this.autoOffsetReset = autoOffsetReset; - } - - @Override - public void run() - { - try - { - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("group.id", groupId); - props.put("client.id", id); - props.put("auto.offset.reset", autoOffsetReset); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - this.consumer = new KafkaConsumer<>(props); - - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING!", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - running.set(false); // Mark the instance as not running - } - finally - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{} - Consumer-Thread exiting", id); - } - } - - - public synchronized void start() - { - boolean stateChanged = running.compareAndSet(false, true); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - future = executor.submit(this); - } - - public synchronized void stop() throws ExecutionException, InterruptedException - { - boolean stateChanged = running.compareAndSet(true, false); - if (!stateChanged) - throw new RuntimeException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - future.get(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - finally - { - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml deleted file mode 100644 index db37822..0000000 --- a/src/main/resources/application.yml +++ /dev/null @@ -1,17 +0,0 @@ -consumer: - bootstrap-server: :9092 - group-id: my-consumer - client-id: peter - topic: test - auto-offset-reset: earliest -management: - endpoints: - web: - exposure: - include: "*" -logging: - level: - root: INFO - de.juplo: DEBUG -server: - port: 8081 diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - - -- 2.20.1