From 4bb2f181202664ede1f3b1769904b46b73f26679 Mon Sep 17 00:00:00 2001 From: Kai Moritz <kai@juplo.de> Date: Tue, 29 Oct 2024 14:05:40 +0100 Subject: [PATCH] Setup, um `grundlagen/simple-consumer` als Consumer Group zu skalieren --- .dockerignore | 3 - .maven-dockerexclude | 1 - .maven-dockerinclude | 2 - Dockerfile | 6 - docker/docker-compose.yml | 12 +- pom.xml | 98 ------------- .../java/de/juplo/kafka/ExampleConsumer.java | 132 ------------------ src/main/resources/logback.xml | 16 --- 8 files changed, 10 insertions(+), 260 deletions(-) delete mode 100644 .dockerignore delete mode 100644 .maven-dockerexclude delete mode 100644 .maven-dockerinclude delete mode 100644 Dockerfile delete mode 100644 pom.xml delete mode 100644 src/main/java/de/juplo/kafka/ExampleConsumer.java delete mode 100644 src/main/resources/logback.xml diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 49f82a91..00000000 --- a/.dockerignore +++ /dev/null @@ -1,3 +0,0 @@ -* -!target/*.jar -!target/libs/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude deleted file mode 100644 index 72e8ffc0..00000000 --- a/.maven-dockerexclude +++ /dev/null @@ -1 +0,0 @@ -* diff --git a/.maven-dockerinclude b/.maven-dockerinclude deleted file mode 100644 index a00c65ff..00000000 --- a/.maven-dockerinclude +++ /dev/null @@ -1,2 +0,0 @@ -target/*.jar -target/libs/*.jar diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 22819afe..00000000 --- a/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM eclipse-temurin:21-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ "kafka:9092", "test", "my-group", "DCKR" ] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index cba608be..3b5752c1 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -193,9 +193,17 @@ services: image: juplo/simple-producer:1.0-SNAPSHOT command: kafka:9092 test producer - consumer: + consumer-1: image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer + command: kafka:9092 test my-group consumer-1 + + consumer-2: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group consumer-2 + + consumer-3: + image: juplo/simple-consumer:1.0-SNAPSHOT + command: kafka:9092 test my-group consumer-3 volumes: zookeeper-data: diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 2d81d24e..00000000 --- a/pom.xml +++ /dev/null @@ -1,98 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-parent</artifactId> - <version>3.3.4</version> - <relativePath/> <!-- lookup parent from repository --> - </parent> - - <groupId>de.juplo.kafka</groupId> - <artifactId>simple-consumer</artifactId> - <name>Simple Consumer-Group</name> - <description>Super Simple Consumer-Group, that is implemented as a plain Java-program</description> - <version>1.0-SNAPSHOT</version> - - <properties> - <java.version>21</java.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - </dependency> - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>${project.build.directory}/libs</outputDirectory> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <configuration> - <archive> - <manifest> - <addClasspath>true</addClasspath> - <classpathPrefix>libs/</classpathPrefix> - <mainClass>de.juplo.kafka.ExampleConsumer</mainClass> - </manifest> - </archive> - </configuration> - </plugin> - <plugin> - <groupId>pl.project13.maven</groupId> - <artifactId>git-commit-id-plugin</artifactId> - </plugin> - <plugin> - <groupId>io.fabric8</groupId> - <artifactId>docker-maven-plugin</artifactId> - <version>0.45.0</version> - <configuration> - <images> - <image> - <name>juplo/%a:%v</name> - </image> - </images> - </configuration> - <executions> - <execution> - <id>build</id> - <phase>package</phase> - <goals> - <goal>build</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java deleted file mode 100644 index 3634875b..00000000 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ /dev/null @@ -1,132 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; - - -@Slf4j -public class ExampleConsumer -{ - private final String id; - private final String topic; - private final Consumer<String, String> consumer; - - private volatile boolean running = false; - private long consumed = 0; - - public ExampleConsumer( - String broker, - String topic, - String groupId, - String clientId) - { - Properties props = new Properties(); - // Konfiguration für den Consumer zusammenstellen - - this.id = clientId; - this.topic = topic; - consumer = new KafkaConsumer<>(props); - } - - - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - // TODO: subscribe! - running = true; - - while (true) - { - // TODO: - // Ãber consumer.poll() Nachrichten abrufen und für die - // empfangenen Nachrichten die Methode handleRecord - // aufrufen. - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error, unsubscribing!", id, e); - } - finally - { - running = false; - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - private void handleRecord( - String topic, - Integer partition, - Long offset, - String key, - String value) - { - consumed++; - log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); - } - - - public static void main(String[] args) throws Exception - { - String broker = ":9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; - - switch (args.length) - { - case 4: - clientId = args[3]; - case 3: - groupId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - - ExampleConsumer instance = new ExampleConsumer(broker, topic, groupId, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running ExampleConsumer: broker={}, topic={}, group-id={}, client-id={}", - broker, - topic, - groupId, - clientId); - instance.run(); - } -} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index 7a25e76f..00000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,16 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> - <Pattern>%d{HH:mm:ss.SSS} | %highlight(%-5level) %msg%n</Pattern> - </encoder> - </appender> - - <logger name="de.juplo" level="TRACE"/> - - <root level="INFO"> - <appender-ref ref="STDOUT" /> - </root> - -</configuration> -- 2.20.1