From 445970a9555c8fa1290806784c50da7a5bed64d5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 28 Sep 2024 08:30:24 +0200 Subject: [PATCH] Producer und Consumer werden als Image bezogen --- Dockerfile | 6 - README.sh | 20 +-- pom.xml | 95 ------------ .../java/de/juplo/kafka/SimpleConsumer.java | 137 ------------------ src/main/resources/logback.xml | 17 --- 5 files changed, 1 insertion(+), 274 deletions(-) delete mode 100644 Dockerfile delete mode 100644 pom.xml delete mode 100644 src/main/java/de/juplo/kafka/SimpleConsumer.java delete mode 100644 src/main/resources/logback.xml diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 4d8a0d8..0000000 --- a/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM openjdk:17-jdk-slim -VOLUME /tmp -COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ ":9092", "test", "my-group", "DCKR" ] diff --git a/README.sh b/README.sh index ff0b58b..e38e8f0 100755 --- a/README.sh +++ b/README.sh @@ -1,36 +1,18 @@ #!/bin/bash -IMAGE=juplo/simple-consumer:1.0-SNAPSHOT - if [ "$1" = "cleanup" ] then docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean exit fi -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf consumer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 +docker compose -f docker/docker-compose.yml ps docker compose -f docker/docker-compose.yml up -t0 -d cli sleep 1 docker compose -f docker/docker-compose.yml logs setup -docker compose -f docker/docker-compose.yml ps - docker compose -f docker/docker-compose.yml up -d producer docker compose -f docker/docker-compose.yml up -d consumer diff --git a/pom.xml b/pom.xml deleted file mode 100644 index 327625b..0000000 --- a/pom.xml +++ /dev/null @@ -1,95 +0,0 @@ - - - - 4.0.0 - - - org.springframework.boot - spring-boot-starter-parent - 3.2.5 - - - - de.juplo.kafka - simple-consumer - Simple Consumer-Group - Super Simple Consumer-Group, that is implemented as a plain Java-program - 1.0-SNAPSHOT - - - 17 - - - - - org.apache.kafka - kafka-clients - - - org.projectlombok - lombok - - - ch.qos.logback - logback-classic - - - - - - - org.apache.maven.plugins - maven-dependency-plugin - - - copy-dependencies - package - - copy-dependencies - - - ${project.build.directory}/libs - - - - - - org.apache.maven.plugins - maven-jar-plugin - - - - true - libs/ - de.juplo.kafka.SimpleConsumer - - - - - - io.fabric8 - docker-maven-plugin - 0.44.0 - - - - juplo/%a:%v - - - - - - build - package - - build - - - - - - - - - diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index cee2165..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,137 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; - - -@Slf4j -public class SimpleConsumer -{ - private final String id; - private final String topic; - private final Consumer consumer; - - private volatile boolean running = false; - private long consumed = 0; - - public SimpleConsumer(String broker, String topic, String groupId, String clientId) - { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("group.id", groupId); // ID für die Offset-Commits - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("auto.offset.reset", "earliest"); // Von Beginn an lesen - props.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"); - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - this.id = clientId; - this.topic = topic; - consumer = new KafkaConsumer<>(props); - } - - - public void run() - { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); - running = true; - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - } - } - catch(WakeupException e) - { - log.info("{} - Consumer was signaled to finish its work", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}, unsubscribing!", id, e.toString()); - consumer.unsubscribe(); - } - finally - { - running = false; - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - - - public static void main(String[] args) throws Exception - { - String broker = ":9092"; - String topic = "test"; - String groupId = "my-group"; - String clientId = "DEV"; - - switch (args.length) - { - case 4: - clientId = args[3]; - case 3: - groupId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - - SimpleConsumer instance = new SimpleConsumer(broker, topic, groupId, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.consumer.wakeup(); - - while (instance.running) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running SimpleConsumer: broker={}, topic={}, group-id={}, client-id={}", - broker, - topic, - groupId, - clientId); - instance.run(); - } -} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml deleted file mode 100644 index b8e6780..0000000 --- a/src/main/resources/logback.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - - %highlight(%-5level) %m%n - - - - - - - - - - - -- 2.20.1