From 91cb1d6dee80bbb386f946e889db8b3d53e1e2d6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 28 Sep 2024 11:58:11 +0200 Subject: [PATCH] Vorlage --- Dockerfile | 6 - README.sh | 36 ----- docker/docker-compose.yml | 4 - .../java/de/juplo/kafka/ExampleProducer.java | 145 ------------------ 4 files changed, 191 deletions(-) delete mode 100644 Dockerfile delete mode 100755 README.sh diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 74e66edf..00000000 --- a/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM eclipse-temurin:21-jre -VOLUME /tmp -COPY target/*.jar /opt/app.jar -COPY target/libs /opt/libs -ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] -CMD [ "kafka:9092", "test", "DCKR" ] diff --git a/README.sh b/README.sh deleted file mode 100755 index 3d98ace7..00000000 --- a/README.sh +++ /dev/null @@ -1,36 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/simple-producer:1.0-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -sleep 5 - -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' - -docker compose -f docker/docker-compose.yml stop producer -docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -docker compose -f docker/docker-compose.yml logs producer diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 5b19de74..39cc67a3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -135,10 +135,6 @@ services: - kafka-2 - kafka-3 - producer: - image: juplo/simple-producer:1.0-SNAPSHOT - command: kafka:9092 test producer - volumes: zookeeper-data: zookeeper-log: diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index c3db7e7c..a0e53c7a 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,157 +1,12 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.util.Properties; @Slf4j public class ExampleProducer { - private final String id; - private final String topic; - private final Producer producer; - - private volatile boolean running = true; - private volatile boolean done = false; - private long produced = 0; - - public ExampleProducer( - String broker, - String topic, - String clientId) - { - Properties props = new Properties(); - props.put("bootstrap.servers", broker); - props.put("client.id", clientId); // Nur zur Wiedererkennung - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.id = clientId; - this.topic = topic; - producer = new KafkaProducer<>(props); - } - - public void run() - { - long i = 0; - - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - Thread.sleep(500); - } - } - catch (Exception e) - { - log.error("{} - Unexpected error!", id, e); - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - done = true; - } - } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - log.debug( - "{} - Sent message {}={}, partition={}, offset={}, timestamp={}, latency={}ms", - id, - key, - value, - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR for message {}={}, latency={}ms: {}", - id, - key, - value, - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message {}={}, latency={}ms", - id, - key, - value, - now - time - ); - } - - public static void main(String[] args) throws Exception { - String broker = ":9092"; - String topic = "test"; - String clientId = "DEV"; - - switch (args.length) - { - case 3: - clientId = args[2]; - case 2: - topic = args[1]; - case 1: - broker = args[0]; - } - - ExampleProducer instance = new ExampleProducer(broker, topic, clientId); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.running = false; - while (!instance.done) - { - log.info("Waiting for main-thread..."); - try - { - Thread.sleep(1000); - } - catch (InterruptedException e) {} - } - log.info("Shutdown completed."); - })); - - log.info( - "Running ExampleProducer: broker={}, topic={}, client-id={}", - broker, - topic, - clientId); - instance.run(); } } -- 2.20.1