From 7eb2c864e132b2e1b14c7971d06b4b2c960b4170 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 27 Jul 2022 10:36:43 +0200 Subject: [PATCH] =?utf8?q?M=C3=B6glichst=20einfach=20gehaltener=20technisc?= =?utf8?q?h=20vollst=C3=A4ndiger=20Producer?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Unterschiede zu dem Producer aus `first-contact`: ** Der Producer erzeugt endlos alle ca. 500ms eine Nachricht. ** Der Producer beendet sich ordentlich, wenn STRG-C gedrückt wird. ** Der Producer wird auch als Docker-Image gebaut * Das Compose-Setup an das Setup aus den vorhergehenden Übungen angegelichen. --- Dockerfile | 6 + README.sh | 38 ++--- docker-compose.yml | 49 ++++++- pom.xml | 58 ++++++-- .../java/de/juplo/kafka/SimpleConsumer.java | 131 ------------------ .../java/de/juplo/kafka/SimpleProducer.java | 60 ++++++-- 6 files changed, 165 insertions(+), 177 deletions(-) create mode 100644 Dockerfile delete mode 100644 src/main/java/de/juplo/kafka/SimpleConsumer.java diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..ea4d335 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,6 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +COPY target/dependency /opt/libs +ENTRYPOINT [ "java", "-jar", "/opt/app.jar", "kafka:9092", "test" ] +CMD [ "DCKR" ] diff --git a/README.sh b/README.sh index 0ee50a9..3f46c04 100755 --- a/README.sh +++ b/README.sh @@ -1,5 +1,7 @@ #!/bin/bash +IMAGE=juplo/simple-producer:1.0-SNAPSHOT + if [ "$1" = "cleanup" ] then docker-compose down -v @@ -7,27 +9,27 @@ then exit fi -mvn package || exit 1 -if [ "$1" = "build" ]; then exit; fi - -trap 'kill $(jobs -p) 2>/dev/null' EXIT +docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli -docker-compose up -d +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 +docker-compose up setup +docker-compose up -d +sleep 5 -echo "Producing messages" -mvn exec:java@producer - -echo "Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 +docker-compose exec cli kafkacat -b kafka:9092 -t test -q -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' -echo "Re-Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 +docker-compose stop producer +docker-compose exec cli kafkacat -b kafka:9092 -t test -q -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n' +docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index ec307f5..d9f15c8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,20 +7,56 @@ services: ports: - 2181:2181 - kafka: + kafka-1: image: confluentinc/cp-kafka:7.1.3 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081 + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + ports: + - 9081:9081 + depends_on: + - zookeeper + + kafka-2: + image: confluentinc/cp-kafka:7.1.3 + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082 - KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082 KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" ports: - 9092:9082 - 9082:9082 + networks: + default: + aliases: + - kafka + depends_on: + - zookeeper + + kafka-3: + image: confluentinc/cp-kafka:7.1.3 + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083 + KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083 + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + ports: + - 9083:9083 depends_on: - zookeeper @@ -29,9 +65,14 @@ services: command: > bash -c " kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 + kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2 + kafka-topics --bootstrap-server kafka:9092 --describe --topic test " cli: image: juplo/toolbox command: sleep infinity + + producer: + image: juplo/simple-producer:1.0-SNAPSHOT + command: producer diff --git a/pom.xml b/pom.xml index 70f37e8..4c52f37 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - first-contact - First Contact: a Simple Producer and a simple Consumer-Group + simple-producer + Super Simple Producer 1.0-SNAPSHOT @@ -34,25 +34,61 @@ - org.codehaus.mojo - exec-maven-plugin - 3.0.0 + pl.project13.maven + git-commit-id-plugin + + + org.apache.maven.plugins + maven-dependency-plugin - producer + copy-dependencies + package + + copy-dependencies + - de.juplo.kafka.SimpleProducer + ${project.build.directory}/libs + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + libs/ + de.juplo.kafka.SimpleProducer + + + + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + + - consumer - - de.juplo.kafka.SimpleConsumer - + build + package + + build + + diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index e4d9697..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,131 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -@Slf4j -public class SimpleConsumer -{ - private long consumed = 0; - private KafkaConsumer consumer; - private Lock lock = new ReentrantLock(); - private Condition stopped = lock.newCondition(); - - - public SimpleConsumer() - { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", ":9092"); - props.put("group.id", "my-consumer"); // << Used for Offset-Commits - // end::create[] - props.put("auto.offset.reset", "earliest"); - // tag::create[] - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - KafkaConsumer consumer = new KafkaConsumer<>(props); - // end::create[] - this.consumer = consumer; - } - - - public void run() - { - String id = "C"; - - try - { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList("test")); - - // tag::loop[] - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - // end::loop[] - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - // tag::loop[] - } - // end::loop[] - } - catch(WakeupException e) - { - log.info("{} - RIIING!", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString()); - } - finally - { - this.lock.lock(); - try - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("C - DONE!"); - stopped.signal(); - } - finally - { - this.lock.unlock(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - } - - - public static void main(String[] args) throws Exception - { - SimpleConsumer instance = new SimpleConsumer(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.lock.lock(); - try - { - instance.consumer.wakeup(); - instance.stopped.await(); - } - catch (InterruptedException e) - { - log.warn("Interrrupted while waiting for the consumer to stop!", e); - } - finally - { - instance.lock.unlock(); - } - })); - - instance.run(); - } -} diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java index 43a7227..fd03d73 100644 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ b/src/main/java/de/juplo/kafka/SimpleProducer.java @@ -6,8 +6,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; @Slf4j @@ -18,21 +16,21 @@ public class SimpleProducer private final KafkaProducer producer; private long produced = 0; + private volatile boolean running = true; + private volatile boolean done = false; - public SimpleProducer(String clientId, String topic) + public SimpleProducer(String broker, String topic, String id) { - // tag::create[] Properties props = new Properties(); - props.put("bootstrap.servers", "localhost:9092"); + props.put("bootstrap.servers", broker); + props.put("client.id", id); // Nur zur Wiedererkennung props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); - KafkaProducer producer = new KafkaProducer<>(props); - // end::create[] + producer = new KafkaProducer<>(props); - this.id = clientId; this.topic = topic; - this.producer = producer; + this.id = id; } public void run() @@ -41,18 +39,19 @@ public class SimpleProducer try { - for (; i < 100 ; i++) + for (; running ; i++) { send(Long.toString(i%10), Long.toString(i)); + Thread.sleep(500); } - - log.info("{} - Done", id); } + catch (InterruptedException e) {} finally { log.info("{}: Closing the KafkaProducer", id); producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); + done = true; } } @@ -111,7 +110,42 @@ public class SimpleProducer public static void main(String[] args) throws Exception { - SimpleProducer producer = new SimpleProducer("P", "test"); + String broker = ":9092"; + String topic = "test"; + String clientId = "DEV"; + + switch (args.length) + { + case 3: + clientId = args[2]; + case 2: + topic = args[1]; + case 1: + broker = args[0]; + } + + SimpleProducer producer = new SimpleProducer(broker, topic, clientId); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> + { + producer.running = false; + while (!producer.done) + { + log.info("Waiting for producer..."); + try + { + Thread.sleep(1000); + } + catch (InterruptedException e) {} + } + log.info("Shutdown completed."); + })); + + log.info( + "Running simple producer: broker={}, topic={}, client-id={}", + broker, + topic, + clientId); producer.run(); } } -- 2.20.1