From 4eda51409d1de3ab3bd2621d94c5b336a44b1f81 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 28 Sep 2024 07:49:04 +0200 Subject: [PATCH] `docker` in `simple-producer` verwandelt` --- .dockerignore | 3 + .maven-dockerexclude | 1 + .maven-dockerinclude | 2 + Dockerfile | 6 + README.sh | 44 ++--- docker/docker-compose.yml | 4 + pom.xml | 98 +++++++++++ .../java/de/juplo/kafka/ExampleProducer.java | 153 ++++++++++++++++++ src/main/resources/logback.xml | 17 ++ 9 files changed, 310 insertions(+), 18 deletions(-) create mode 100644 .dockerignore create mode 100644 .maven-dockerexclude create mode 100644 .maven-dockerinclude create mode 100644 Dockerfile create mode 100644 pom.xml create mode 100644 src/main/java/de/juplo/kafka/ExampleProducer.java create mode 100644 src/main/resources/logback.xml diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..49f82a9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +* +!target/*.jar +!target/libs/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 0000000..a00c65f --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1,2 @@ +target/*.jar +target/libs/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..74e66ed --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM eclipse-temurin:21-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +COPY target/libs /opt/libs +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [ "kafka:9092", "test", "DCKR" ] diff --git a/README.sh b/README.sh index da741d4..97ca7cc 100755 --- a/README.sh +++ b/README.sh @@ -1,32 +1,40 @@ #!/bin/bash +IMAGE=juplo/simple-producer:1.0-SNAPSHOT + if [ "$1" = "cleanup" ] then docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans + mvn clean exit fi +docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 +docker compose -f docker/docker-compose.yml rm -svf producer + +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn clean install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi + docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 -docker compose -f docker/docker-compose.yml ps docker compose -f docker/docker-compose.yml up -t0 -d cli sleep 1 docker compose -f docker/docker-compose.yml logs setup -echo -echo "Hilfe-Ausgabe von kafkacat" -echo -docker compose -f docker/docker-compose.yml exec -T cli kafkacat -h -echo -echo "Nachrichten schreiben mit kafkacat" -echo -docker compose -f docker/docker-compose.yml exec -T cli kafkacat -P -b kafka:9092 -t test << EOF -Hallo Welt! -Nachricht #1 -Nachricht #2 -Nachricht #3 -EOF -echo -echo "Nachrichten lesen mit kafkacat" -echo -docker compose -f docker/docker-compose.yml exec cli kafkacat -C -b kafka:9092 -t test -o beginning -e +docker compose -f docker/docker-compose.yml ps +docker compose -f docker/docker-compose.yml up -d producer +sleep 5 + +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' + +docker compose -f docker/docker-compose.yml stop producer +docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' +docker compose -f docker/docker-compose.yml logs producer diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3aa7920..531a116 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -187,6 +187,10 @@ services: - kafka-2 - kafka-3 + producer: + image: juplo/simple-producer:1.0-SNAPSHOT + command: kafka:9092 test producer + volumes: zookeeper-data: zookeeper-log: diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..ad7f17a --- /dev/null +++ b/pom.xml @@ -0,0 +1,98 @@ + + + + 4.0.0 + + + org.springframework.boot + spring-boot-starter-parent + 3.3.4 + + + + de.juplo.kafka + simple-producer + Super Simple Producer + A Simple Producer, programmed with pure Java, that sends messages via Kafka + 1.0-SNAPSHOT + + + 21 + + + + + org.apache.kafka + kafka-clients + + + org.projectlombok + lombok + + + ch.qos.logback + logback-classic + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/libs + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + libs/ + de.juplo.kafka.ExampleProducer + + + + + + pl.project13.maven + git-commit-id-plugin + + + io.fabric8 + docker-maven-plugin + 0.45.0 + + + + juplo/%a:%v + + + + + + build + package + + build + + + + + + + + diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java new file mode 100644 index 0000000..06e14ef --- /dev/null +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -0,0 +1,153 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import java.util.Properties; + + +@Slf4j +public class ExampleProducer +{ + private final String id; + private final String topic; + private final Producer producer; + + private volatile boolean running = true; + private volatile boolean done = false; + private long produced = 0; + + public ExampleProducer(String broker, String topic, String clientId) + { + Properties props = new Properties(); + props.put("bootstrap.servers", broker); + props.put("client.id", clientId); // Nur zur Wiedererkennung + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.id = clientId; + this.topic = topic; + producer = new KafkaProducer<>(props); + } + + public void run() + { + long i = 0; + + try + { + for (; running; i++) + { + send(Long.toString(i%10), Long.toString(i)); + Thread.sleep(500); + } + } + catch (Exception e) + { + log.error("{} - Unexpected error: {}!", id, e.toString()); + } + finally + { + log.info("{}: Closing the KafkaProducer", id); + producer.close(); + log.info("{}: Produced {} messages in total, exiting!", id, produced); + done = true; + } + } + + void send(String key, String value) + { + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + key, // Key + value // Value + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + produced++; + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message with key={} latency={}ms", + id, + record.key(), + now - time + ); + } + + + public static void main(String[] args) throws Exception + { + String broker = ":9092"; + String topic = "test"; + String clientId = "DEV"; + + switch (args.length) + { + case 3: + clientId = args[2]; + case 2: + topic = args[1]; + case 1: + broker = args[0]; + } + + ExampleProducer instance = new ExampleProducer(broker, topic, clientId); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> + { + instance.running = false; + while (!instance.done) + { + log.info("Waiting for main-thread..."); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + log.info("Shutdown completed."); + })); + + log.info( + "Running ExampleProducer: broker={}, topic={}, client-id={}", + broker, + topic, + clientId); + instance.run(); + } +} diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml new file mode 100644 index 0000000..b8e6780 --- /dev/null +++ b/src/main/resources/logback.xml @@ -0,0 +1,17 @@ + + + + + + %highlight(%-5level) %m%n + + + + + + + + + + + -- 2.20.1