From 6db60eede6c6a14a20fb88cdcd0e0be0e959d31b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 Dec 2021 19:09:57 +0100 Subject: [PATCH] Endless Producer: a simple producer, implemented as Spring-Boot-App --- .dockerignore | 2 + .maven-dockerexclude | 1 + .maven-dockerinclude | 1 + Dockerfile | 5 + README.sh | 38 ++-- docker-compose.yml | 10 + pom.xml | 54 ++++-- src/main/java/de/juplo/kafka/Application.java | 46 +++++ .../de/juplo/kafka/ApplicationProperties.java | 17 ++ .../java/de/juplo/kafka/DriverController.java | 28 +++ .../java/de/juplo/kafka/EndlessProducer.java | 171 ++++++++++++++++++ .../java/de/juplo/kafka/SimpleConsumer.java | 131 -------------- .../java/de/juplo/kafka/SimpleProducer.java | 88 --------- src/main/resources/application.yml | 15 ++ 14 files changed, 351 insertions(+), 256 deletions(-) create mode 100644 .dockerignore create mode 100644 .maven-dockerexclude create mode 100644 .maven-dockerinclude create mode 100644 Dockerfile create mode 100644 src/main/java/de/juplo/kafka/Application.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java create mode 100644 src/main/java/de/juplo/kafka/DriverController.java create mode 100644 src/main/java/de/juplo/kafka/EndlessProducer.java delete mode 100644 src/main/java/de/juplo/kafka/SimpleConsumer.java delete mode 100644 src/main/java/de/juplo/kafka/SimpleProducer.java create mode 100644 src/main/resources/application.yml diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 0000000..fd6cecd --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/README.sh b/README.sh index 95aae0b..0544297 100755 --- a/README.sh +++ b/README.sh @@ -1,5 +1,7 @@ #!/bin/bash +IMAGE=juplo/endless-producer:1.0-SNAPSHOT + if [ "$1" = "cleanup" ] then docker-compose down -v @@ -7,27 +9,23 @@ then exit fi -mvn package || exit 1 -if [ "$1" = "build" ]; then exit; fi - -trap 'kill $(jobs -p) 2>/dev/null' EXIT +docker-compose up -d zookeeper kafka cli -docker-compose up -d +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec kafka cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 - -echo "Producing messages" -mvn exec:java@producer - -echo "Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 - -echo "Re-Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 +docker-compose up setup +docker-compose up -d producer +sleep 5 +docker-compose stop producer +docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index 13e950d..10ad3a0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,3 +35,13 @@ services: cli: image: juplo/toolbox command: sleep infinity + + producer: + image: juplo/endless-producer:1.0-SNAPSHOT + ports: + - 8080:8080 + environment: + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + producer.throttle-ms: 200 diff --git a/pom.xml b/pom.xml index 39a9e4e..7028cfd 100644 --- a/pom.xml +++ b/pom.xml @@ -12,11 +12,24 @@ de.juplo.kafka - first-contact - First Contact: a Simple Producer and a simple Consumer-Group + endless-producer + Endless Producer: a Simple Producer that endlessly writes numbers into a topic 1.0-SNAPSHOT + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-configuration-processor + true + org.apache.kafka kafka-clients @@ -26,29 +39,36 @@ lombok - ch.qos.logback - logback-classic + org.springframework.boot + spring-boot-starter-test + test - org.codehaus.mojo - exec-maven-plugin - 3.0.0 + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + - producer - - de.juplo.kafka.SimpleProducer - - - - consumer - - de.juplo.kafka.SimpleConsumer - + build + package + + build + diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 0000000..bc617a8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.util.concurrent.Executors; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + @Autowired + ApplicationProperties properties; + + + @Bean + public EndlessProducer producer() + { + Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set"); + Assert.hasText(properties.getClientId(), "producer.client-id must be set"); + Assert.hasText(properties.getTopic(), "producer.topic must be set"); + + EndlessProducer producer = + new EndlessProducer( + Executors.newFixedThreadPool(1), + properties.getBootstrapServer(), + properties.getClientId(), + properties.getTopic(), + properties.getAcks(), + properties.getThrottleMs()); + + producer.start(); + + return producer; + } + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 0000000..ab26890 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,17 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "producer") +@Getter +@Setter +public class ApplicationProperties +{ + private String bootstrapServer; + private String clientId; + private String topic; + private String acks; + private int throttleMs; +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java new file mode 100644 index 0000000..f8a287d --- /dev/null +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -0,0 +1,28 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.ExecutionException; + + +@RestController +@RequiredArgsConstructor +public class DriverController +{ + private final EndlessProducer producer; + + + @PostMapping("start") + public void start() + { + producer.start(); + } + + @PostMapping("stop") + public void stop() throws ExecutionException, InterruptedException + { + producer.stop(); + } +} diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java new file mode 100644 index 0000000..43b0e41 --- /dev/null +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -0,0 +1,171 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import javax.annotation.PreDestroy; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + + +@Slf4j +public class EndlessProducer implements Runnable +{ + private final ExecutorService executor; + private final String id; + private final String topic; + private final String acks; + private final int throttleMs; + private final KafkaProducer producer; + + private boolean running = false; + private long i = 0; + private long produced = 0; + private Future future = null; + + public EndlessProducer( + ExecutorService executor, + String bootstrapServer, + String clientId, + String topic, + String acks, + int throttleMs) + { + this.executor = executor; + this.id = clientId; + this.topic = topic; + this.acks = acks; + this.throttleMs = throttleMs; + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServer); + props.put("client.id", clientId); + props.put("acks", acks); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + } + + @Override + public void run() + { + try + { + for (; running; i++) + { + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + Long.toString(i % 10), // Key + Long.toString(i) // Value + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + produced++; + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued #{} key={} latency={}ms", + id, + i, + record.key(), + now - time + ); + + if (throttleMs > 0) + { + try + { + Thread.sleep(throttleMs); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", e); + } + } + } + + log.info("{} - Done", id); + } + catch (Exception e) + { + + } + } + + public synchronized void start() + { + if (running) + throw new IllegalStateException("Producer instance " + id + " is already running!"); + + log.info("{} - Starting - produced {} messages before", id, produced); + running = true; + future = executor.submit(this); + } + + public synchronized void stop() throws ExecutionException, InterruptedException + { + if (!running) + throw new IllegalStateException("Producer instance " + id + " is not running!"); + + log.info("{} - Stopping...", id); + running = false; + future.get(); + log.info("{} - Stopped - produced {} messages so far", id, produced); + } + + @PreDestroy + public void destroy() throws ExecutionException, InterruptedException + { + log.info("{} - Destroy!", id); + try + { + stop(); + } + catch (IllegalStateException e) + { + log.info("{} - Was already stopped", id); + } + finally + { + log.info("{} - Closing the KafkaProducer", id); + producer.close(); + log.info("{}: Produced {} messages in total, exiting!", id, produced); + } + } +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index e4d9697..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,131 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -@Slf4j -public class SimpleConsumer -{ - private long consumed = 0; - private KafkaConsumer consumer; - private Lock lock = new ReentrantLock(); - private Condition stopped = lock.newCondition(); - - - public SimpleConsumer() - { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", ":9092"); - props.put("group.id", "my-consumer"); // << Used for Offset-Commits - // end::create[] - props.put("auto.offset.reset", "earliest"); - // tag::create[] - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - KafkaConsumer consumer = new KafkaConsumer<>(props); - // end::create[] - this.consumer = consumer; - } - - - public void run() - { - String id = "C"; - - try - { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList("test")); - - // tag::loop[] - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - // end::loop[] - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - // tag::loop[] - } - // end::loop[] - } - catch(WakeupException e) - { - log.info("{} - RIIING!", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString()); - } - finally - { - this.lock.lock(); - try - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("C - DONE!"); - stopped.signal(); - } - finally - { - this.lock.unlock(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - } - - - public static void main(String[] args) throws Exception - { - SimpleConsumer instance = new SimpleConsumer(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.lock.lock(); - try - { - instance.consumer.wakeup(); - instance.stopped.await(); - } - catch (InterruptedException e) - { - log.warn("Interrrupted while waiting for the consumer to stop!", e); - } - finally - { - instance.lock.unlock(); - } - })); - - instance.run(); - } -} diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java deleted file mode 100644 index 5f57925..0000000 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ /dev/null @@ -1,88 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.util.Properties; - - -@Slf4j -public class SimpleProducer -{ - public static void main(String[] args) throws Exception - { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", "localhost:9092"); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - KafkaProducer producer = new KafkaProducer<>(props); - // end::create[] - - String id = "P"; - long i = 0; - - try - { - for (; i < 100 ; i++) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - "test", // Topic - Long.toString(i%10), // Key - Long.toString(i) // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - i, - record.key(), - now - time - ); - } - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Exiting!", id); - } - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..e4ae52a --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,15 @@ +producer: + bootstrap-server: :9092 + client-id: peter + topic: test + acks: 1 + throttle-ms: 1000 +management: + endpoints: + web: + exposure: + include: "*" +logging: + level: + root: INFO + de.juplo: DEBUG -- 2.20.1