From 71ad4d6e3b223f1c0ca15002f5ce4f3399688193 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 1 Nov 2022 10:55:25 +0100 Subject: [PATCH] =?utf8?q?Anpassungen=20f=C3=BCr=20die=20=C3=9Cbungen=20Sk?= =?utf8?q?alieren,=20Paralelle=20Verarbeitung=20&=20Spikzettel?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Maven-Projekt entfernt -- _Es werden nur die fertigen Images benötigt_ * Das Compose-Setup definiert jetzt 3 Consumer und den Spikzettel-Service * README.s führt die Übung "Consumer-Group skalieren" vor --- .gitignore | 3 - Dockerfile | 6 - README.sh | 25 ++-- pom.xml | 94 ------------ .../java/de/juplo/kafka/SimpleConsumer.java | 136 ------------------ src/main/resources/logback.xml | 17 --- 6 files changed, 9 insertions(+), 272 deletions(-) delete mode 100644 .gitignore delete mode 100644 Dockerfile delete mode 100644 pom.xml delete mode 100644 src/main/java/de/juplo/kafka/SimpleConsumer.java delete mode 100644 src/main/resources/logback.xml diff --git a/.gitignore b/.gitignore deleted file mode 100644 index 6240411..0000000 --- a/.gitignore +++ /dev/null @@ -1,3 +0,0 @@ -*.iml -.idea -target diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 73b568e..0000000 --- a/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM openjdk:11-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ "DCKR" ] diff --git a/README.sh b/README.sh index 6030770..2ea1b38 100755 --- a/README.sh +++ b/README.sh @@ -5,36 +5,29 @@ IMAGE=juplo/simple-consumer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then docker-compose down -v - mvn clean exit fi docker-compose up -d kafka-0 kafka-1 kafka-2 kafka-3 cli -docker-compose rm -svf consumer-1 - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi +docker-compose rm -svf consumer-1 consumer-2 consumer-3 echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer consumer-1 +docker-compose up -d producer consumer-1 consumer-2 consumer-3 +sleep 5 +docker-compose stop consumer-2 consumer-3 sleep 5 -docker-compose restart consumer-1 +docker-compose start consumer-3 +docker-compose stop consumer-1 +sleep 5 +docker-compose start consumer-1 sleep 5 -docker-compose stop consumer-1 producer +docker-compose stop producer consumer-1 consumer-2 consumer-3 docker-compose logs consumer-1 echo "Received messages for test/0" diff --git a/pom.xml b/pom.xml deleted file mode 100644 index b7c8a20..0000000 --- a/pom.xml +++ /dev/null @@ -1,94 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 2.7.2 - - - - de.juplo.kafka - simple-consumer - Super Simple Consumer-Group - 1.0-SNAPSHOT - - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - ch.qos.logback - logback-classic - - - - - - - pl.project13.maven - git-commit-id-plugin - - - org.apache.maven.plugins - maven-dependency-plugin - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/libs - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.SimpleConsumer - - - - - - io.fabric8 - docker-maven-plugin - 0.33.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - - - - diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index cab2fb8..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,136 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; - - -@Slf4j -public class SimpleConsumer -{ - private final String id; - private final String topic; - private final KafkaConsumer consumer; - - private volatile boolean running = false; - private long consumed = 0; - - public SimpleConsumer(String broker, String topic, String groupId, String clientId) - { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("auto.offset.reset", "earliest"); // Von Beginn an lesen - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - consumer = new KafkaConsumer<>(props); - - this.topic = topic; - this.id = clientId; - } - - - public void run() - { - try - { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList("test")); - running = true; - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString()); - } - finally - { - running = false; - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - - public static void main(String[] args) throws Exception - { - String broker = ":9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; - - switch (args.length) - { - case 4: - clientId = args[3]; - case 3: - groupId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - - SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}", - broker, - topic, - groupId, - clientId); - instance.run(); - } -} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - - -- 2.20.1