From e87f4bb2bc188252955fb4932ddd99161ba621d3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 31 Mar 2022 19:49:28 +0200 Subject: [PATCH] Endless Consumer: a simple consumer, implemented as Spring-Boot-App --- .dockerignore | 2 + .maven-dockerexclude | 1 + .maven-dockerinclude | 1 + Dockerfile | 5 + README.sh | 38 +++-- docker-compose.yml | 21 +++ pom.xml | 54 ++++--- src/main/java/de/juplo/kafka/Application.java | 46 ++++++ .../de/juplo/kafka/ApplicationProperties.java | 17 +++ .../java/de/juplo/kafka/DriverController.java | 28 ++++ .../java/de/juplo/kafka/EndlessConsumer.java | 138 ++++++++++-------- .../java/de/juplo/kafka/SimpleProducer.java | 117 --------------- src/main/resources/application.yml | 16 ++ 13 files changed, 268 insertions(+), 216 deletions(-) create mode 100644 .dockerignore create mode 100644 .maven-dockerexclude create mode 100644 .maven-dockerinclude create mode 100644 Dockerfile create mode 100644 src/main/java/de/juplo/kafka/Application.java create mode 100644 src/main/java/de/juplo/kafka/ApplicationProperties.java create mode 100644 src/main/java/de/juplo/kafka/DriverController.java delete mode 100644 src/main/java/de/juplo/kafka/SimpleProducer.java create mode 100644 src/main/resources/application.yml diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 0000000..fd6cecd --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/README.sh b/README.sh index 0ee50a9..900270a 100755 --- a/README.sh +++ b/README.sh @@ -1,5 +1,7 @@ #!/bin/bash +IMAGE=juplo/endless-consumer:1.0-SNAPSHOT + if [ "$1" = "cleanup" ] then docker-compose down -v @@ -7,27 +9,23 @@ then exit fi -mvn package || exit 1 -if [ "$1" = "build" ]; then exit; fi - -trap 'kill $(jobs -p) 2>/dev/null' EXIT +docker-compose up -d zookeeper kafka cli -docker-compose up -d +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 - -echo "Producing messages" -mvn exec:java@producer - -echo "Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 - -echo "Re-Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 +docker-compose up setup +docker-compose up -d producer consumer +sleep 15 +docker-compose stop producer consumer +docker-compose logs consumer diff --git a/docker-compose.yml b/docker-compose.yml index 13e950d..bed1fc1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -35,3 +35,24 @@ services: cli: image: juplo/toolbox command: sleep infinity + + producer: + image: juplo/endless-producer:1.0-SNAPSHOT + ports: + - 8080:8080 + environment: + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + producer.throttle-ms: 200 + + + consumer: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8081:8081 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --git a/pom.xml b/pom.xml index c104ca1..54bb695 100644 --- a/pom.xml +++ b/pom.xml @@ -12,11 +12,24 @@ de.juplo.kafka - first-contact - First Contact: a Simple Producer and a simple Consumer-Group + endless-consumer 1.0-SNAPSHOT + Endless Consumer: a Simple Consumer-Group that reads and print the topic + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-configuration-processor + true + org.apache.kafka kafka-clients @@ -26,29 +39,36 @@ lombok - ch.qos.logback - logback-classic + org.springframework.boot + spring-boot-starter-test + test - org.codehaus.mojo - exec-maven-plugin - 3.0.0 + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + - producer - - de.juplo.kafka.SimpleProducer - - - - consumer - - de.juplo.kafka.SimpleConsumer - + build + package + + build + diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 0000000..85d0e07 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.util.concurrent.Executors; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + @Autowired + ApplicationProperties properties; + + + @Bean + public EndlessConsumer consumer() + { + Assert.hasText(properties.getBootstrapServer(), "consumer.bootstrap-server must be set"); + Assert.hasText(properties.getGroupId(), "consumer.group-id must be set"); + Assert.hasText(properties.getClientId(), "consumer.client-id must be set"); + Assert.hasText(properties.getTopic(), "consumer.topic must be set"); + + EndlessConsumer consumer = + new EndlessConsumer( + Executors.newFixedThreadPool(1), + properties.getBootstrapServer(), + properties.getGroupId(), + properties.getClientId(), + properties.getTopic()); + + consumer.start(); + + return consumer; + } + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 0000000..fdbb2bd --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,17 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties(prefix = "consumer") +@Getter +@Setter +public class ApplicationProperties +{ + private String bootstrapServer; + private String groupId; + private String clientId; + private String topic; +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java new file mode 100644 index 0000000..d8068e5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -0,0 +1,28 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.ExecutionException; + + +@RestController +@RequiredArgsConstructor +public class DriverController +{ + private final EndlessConsumer consumer; + + + @PostMapping("start") + public void start() + { + consumer.start(); + } + + @PostMapping("stop") + public void stop() throws ExecutionException, InterruptedException + { + consumer.stop(); + } +} diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index e4d9697..22dce95 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -7,58 +7,68 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; +import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Arrays; import java.util.Properties; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j -public class SimpleConsumer +public class EndlessConsumer implements Runnable { - private long consumed = 0; - private KafkaConsumer consumer; - private Lock lock = new ReentrantLock(); - private Condition stopped = lock.newCondition(); + private final ExecutorService executor; + private final String bootstrapServer; + private final String groupId; + private final String id; + private final String topic; + private AtomicBoolean running = new AtomicBoolean(); + private long consumed = 0; + private KafkaConsumer consumer = null; + private Future future = null; + + public EndlessConsumer( + ExecutorService executor, + String bootstrapServer, + String groupId, + String clientId, + String topic) + { + this.executor = executor; + this.bootstrapServer = bootstrapServer; + this.groupId = groupId; + this.id = clientId; + this.topic = topic; + } - public SimpleConsumer() + @Override + public void run() { - // tag::create[] Properties props = new Properties(); - props.put("bootstrap.servers", ":9092"); - props.put("group.id", "my-consumer"); // << Used for Offset-Commits - // end::create[] + props.put("bootstrap.servers", bootstrapServer); + props.put("group.id", groupId); + props.put("client.id", id); props.put("auto.offset.reset", "earliest"); - // tag::create[] props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); - KafkaConsumer consumer = new KafkaConsumer<>(props); - // end::create[] - this.consumer = consumer; - } - - - public void run() - { - String id = "C"; + this.consumer = new KafkaConsumer<>(props); try { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList("test")); + log.info("{} - Subscribing to topic {}", id, topic); + consumer.subscribe(Arrays.asList(topic)); - // tag::loop[] while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); // Do something with the data... - // end::loop[] log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { @@ -73,9 +83,7 @@ public class SimpleConsumer record.value() ); } - // tag::loop[] } - // end::loop[] } catch(WakeupException e) { @@ -84,48 +92,54 @@ public class SimpleConsumer catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString()); + running.set(false); // Mark the instance as not running } finally { - this.lock.lock(); - try - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("C - DONE!"); - stopped.signal(); - } - finally - { - this.lock.unlock(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } + log.info("{} - Closing the KafkaConsumer", id); + consumer.close(); + log.info("{} - Consumer-Thread exiting", id); } } - public static void main(String[] args) throws Exception + public synchronized void start() { - SimpleConsumer instance = new SimpleConsumer(); + boolean stateChanged = running.compareAndSet(false, true); + if (!stateChanged) + throw new RuntimeException("Consumer instance " + id + " is already running!"); - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.lock.lock(); - try - { - instance.consumer.wakeup(); - instance.stopped.await(); - } - catch (InterruptedException e) - { - log.warn("Interrrupted while waiting for the consumer to stop!", e); - } - finally - { - instance.lock.unlock(); - } - })); + log.info("{} - Starting - consumed {} messages before", id, consumed); + future = executor.submit(this); + } - instance.run(); + public synchronized void stop() throws ExecutionException, InterruptedException + { + boolean stateChanged = running.compareAndSet(true, false); + if (!stateChanged) + throw new RuntimeException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + consumer.wakeup(); + future.get(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); + } + + @PreDestroy + public void destroy() throws ExecutionException, InterruptedException + { + log.info("{} - Destroy!", id); + try + { + stop(); + } + catch (IllegalStateException e) + { + log.info("{} - Was already stopped", id); + } + finally + { + log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + } } } diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java deleted file mode 100644 index 43a7227..0000000 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ /dev/null @@ -1,117 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - - -@Slf4j -public class SimpleProducer -{ - private final String id; - private final String topic; - private final KafkaProducer producer; - - private long produced = 0; - - public SimpleProducer(String clientId, String topic) - { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", "localhost:9092"); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - KafkaProducer producer = new KafkaProducer<>(props); - // end::create[] - - this.id = clientId; - this.topic = topic; - this.producer = producer; - } - - public void run() - { - long i = 0; - - try - { - for (; i < 100 ; i++) - { - send(Long.toString(i%10), Long.toString(i)); - } - - log.info("{} - Done", id); - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } - } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - value, - record.key(), - now - time - ); - } - - - public static void main(String[] args) throws Exception - { - SimpleProducer producer = new SimpleProducer("P", "test"); - producer.run(); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..763880d --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,16 @@ +consumer: + bootstrap-server: :9092 + group-id: my-consumer + client-id: peter + topic: test +management: + endpoints: + web: + exposure: + include: "*" +logging: + level: + root: INFO + de.juplo: DEBUG +server: + port: 8081 -- 2.20.1