From: Kai Moritz Date: Sat, 23 Jul 2022 08:40:57 +0000 (+0200) Subject: Merge der Upgrades für Confluent/Spring-Boot (Branch 'first-contact') X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ed2942b74c61f6e0a99747c87813900ed666f860;hp=35c806a3746c673a2479c702de7a9903783c34a9;p=demos%2Fkafka%2Ftraining Merge der Upgrades für Confluent/Spring-Boot (Branch 'first-contact') --- diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.maven-dockerexclude b/.maven-dockerexclude new file mode 100644 index 0000000..72e8ffc --- /dev/null +++ b/.maven-dockerexclude @@ -0,0 +1 @@ +* diff --git a/.maven-dockerinclude b/.maven-dockerinclude new file mode 100644 index 0000000..fd6cecd --- /dev/null +++ b/.maven-dockerinclude @@ -0,0 +1 @@ +target/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..16ee25e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre +VOLUME /tmp +COPY target/*.jar /opt/app.jar +ENTRYPOINT [ "java", "-jar", "/opt/app.jar" ] +CMD [] diff --git a/README.sh b/README.sh index 0ee50a9..3ec2781 100755 --- a/README.sh +++ b/README.sh @@ -1,5 +1,7 @@ #!/bin/bash +IMAGE=juplo/endless-producer:1.0-SNAPSHOT + if [ "$1" = "cleanup" ] then docker-compose down -v @@ -7,27 +9,82 @@ then exit fi -mvn package || exit 1 -if [ "$1" = "build" ]; then exit; fi - -trap 'kill $(jobs -p) 2>/dev/null' EXIT +docker-compose up -d zookeeper kafka cli -docker-compose up -d +if [[ + $(docker image ls -q $IMAGE) == "" || + "$1" = "build" +]] +then + mvn install || exit +else + echo "Using image existing images:" + docker image ls $IMAGE +fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 +docker-compose up -d kafka-ui + +docker-compose exec -T cli bash << 'EOF' +echo "Creating topic with 3 partitions..." +kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test +# tag::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3 +# end::createtopic[] +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +EOF + +docker-compose up -d producer consumer + +sleep 10 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen + +docker-compose exec -T cli bash << 'EOF' +echo "Altering number of partitions from 3 to 7..." +# tag::altertopic[] +kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7 +kafka-topics --bootstrap-server kafka:9092 --describe --topic test +# end::altertopic[] +EOF + +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen -echo "Producing messages" -mvn exec:java@producer +http post :8081/stop +http post :8081/start -echo "Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen +sleep 1 +http :8081/seen -echo "Re-Reading messages" -mvn exec:java@consumer & -sleep 7 -kill $(jobs -p) -sleep 2 +docker-compose stop producer consumer diff --git a/docker-compose.yml b/docker-compose.yml index ec307f5..023bfdd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,14 +24,35 @@ services: depends_on: - zookeeper - setup: - image: juplo/toolbox - command: > - bash -c " - kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test - kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 - " + kafka-ui: + image: provectuslabs/kafka-ui:0.3.3 + ports: + - 8080:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 cli: image: juplo/toolbox command: sleep infinity + + producer: + image: juplo/endless-producer:1.0-SNAPSHOT + ports: + - 8000:8080 + environment: + producer.bootstrap-server: kafka:9092 + producer.client-id: producer + producer.topic: test + producer.throttle-ms: 10 + + + consumer: + image: juplo/counting-consumer:1.0-SNAPSHOT + ports: + - 8081:8081 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --git a/pom.xml b/pom.xml index 70f37e8..e7ad998 100644 --- a/pom.xml +++ b/pom.xml @@ -12,11 +12,24 @@ de.juplo.kafka - first-contact - First Contact: a Simple Producer and a simple Consumer-Group + endless-producer + Endless Producer: a Simple Producer that endlessly writes numbers into a topic 1.0-SNAPSHOT + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-configuration-processor + true + org.apache.kafka kafka-clients @@ -26,29 +39,36 @@ lombok - ch.qos.logback - logback-classic + org.springframework.boot + spring-boot-starter-test + test - org.codehaus.mojo - exec-maven-plugin - 3.0.0 + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.33.0 + + + + juplo/%a:%v + + + - producer - - de.juplo.kafka.SimpleProducer - - - - consumer - - de.juplo.kafka.SimpleConsumer - + build + package + + build + diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java new file mode 100644 index 0000000..bc617a8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Application.java @@ -0,0 +1,46 @@ +package de.juplo.kafka; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.util.concurrent.Executors; + + +@SpringBootApplication +@EnableConfigurationProperties(ApplicationProperties.class) +public class Application +{ + @Autowired + ApplicationProperties properties; + + + @Bean + public EndlessProducer producer() + { + Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set"); + Assert.hasText(properties.getClientId(), "producer.client-id must be set"); + Assert.hasText(properties.getTopic(), "producer.topic must be set"); + + EndlessProducer producer = + new EndlessProducer( + Executors.newFixedThreadPool(1), + properties.getBootstrapServer(), + properties.getClientId(), + properties.getTopic(), + properties.getAcks(), + properties.getThrottleMs()); + + producer.start(); + + return producer; + } + + public static void main(String[] args) + { + SpringApplication.run(Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java new file mode 100644 index 0000000..ab26890 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -0,0 +1,17 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties(prefix = "producer") +@Getter +@Setter +public class ApplicationProperties +{ + private String bootstrapServer; + private String clientId; + private String topic; + private String acks; + private int throttleMs; +} diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java new file mode 100644 index 0000000..b3af107 --- /dev/null +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -0,0 +1,35 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.concurrent.ExecutionException; + + +@RestController +@RequiredArgsConstructor +public class DriverController +{ + private final EndlessProducer producer; + + + @PostMapping("start") + public void start() + { + producer.start(); + } + + @PostMapping("stop") + public void stop() throws ExecutionException, InterruptedException + { + producer.stop(); + } + + @ExceptionHandler + public ErrorResponse illegalStateException(IllegalStateException e) + { + return new ErrorResponse(e.getMessage(), 400); + } +} diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java new file mode 100644 index 0000000..8b3743d --- /dev/null +++ b/src/main/java/de/juplo/kafka/EndlessProducer.java @@ -0,0 +1,179 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; + +import javax.annotation.PreDestroy; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + + +@Slf4j +public class EndlessProducer implements Runnable +{ + private final ExecutorService executor; + private final String id; + private final String topic; + private final int throttleMs; + private final KafkaProducer producer; + + private boolean running = false; + private long i = 0; + private long produced = 0; + + public EndlessProducer( + ExecutorService executor, + String bootstrapServer, + String clientId, + String topic, + String acks, + int throttleMs) + { + this.executor = executor; + this.id = clientId; + this.topic = topic; + this.throttleMs = throttleMs; + + Properties props = new Properties(); + props.put("bootstrap.servers", bootstrapServer); + props.put("client.id", clientId); + props.put("acks", acks); + props.put("metadata.max.age.ms", "1000"); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + } + + @Override + public void run() + { + try + { + for (; running; i++) + { + send(Long.toString(i%10), Long.toString(i)); + + if (throttleMs > 0) + { + try + { + Thread.sleep(throttleMs); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", e); + } + } + } + + log.info("{} - Done", id); + } + catch (Exception e) + { + log.error("{} - Unexpected Exception:", id, e); + } + finally + { + synchronized (this) + { + running = false; + log.info("{} - Stopped - produced {} messages so far", id, produced); + } + } + } + + void send(String key, String value) + { + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + key, // Key + value // Value + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + produced++; + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued #{} key={} latency={}ms", + id, + value, + record.key(), + now - time + ); + } + + public synchronized void start() + { + if (running) + throw new IllegalStateException("Producer instance " + id + " is already running!"); + + log.info("{} - Starting - produced {} messages before", id, produced); + running = true; + executor.submit(this); + } + + public synchronized void stop() throws ExecutionException, InterruptedException + { + if (!running) + throw new IllegalStateException("Producer instance " + id + " is not running!"); + + log.info("{} - Stopping...", id); + running = false; + } + + @PreDestroy + public void destroy() throws ExecutionException, InterruptedException + { + log.info("{} - Destroy!", id); + try + { + stop(); + } + catch (IllegalStateException e) + { + log.info("{} - Was already stopped", id); + } + finally + { + log.info("{} - Closing the KafkaProducer", id); + producer.close(); + log.info("{}: Produced {} messages in total, exiting!", id, produced); + } + } +} diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java new file mode 100644 index 0000000..5ca206d --- /dev/null +++ b/src/main/java/de/juplo/kafka/ErrorResponse.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ErrorResponse +{ + private final String error; + private final Integer status; +} diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java deleted file mode 100644 index e4d9697..0000000 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ /dev/null @@ -1,131 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; - -import java.time.Duration; -import java.util.Arrays; -import java.util.Properties; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - - -@Slf4j -public class SimpleConsumer -{ - private long consumed = 0; - private KafkaConsumer consumer; - private Lock lock = new ReentrantLock(); - private Condition stopped = lock.newCondition(); - - - public SimpleConsumer() - { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", ":9092"); - props.put("group.id", "my-consumer"); // << Used for Offset-Commits - // end::create[] - props.put("auto.offset.reset", "earliest"); - // tag::create[] - props.put("key.deserializer", StringDeserializer.class.getName()); - props.put("value.deserializer", StringDeserializer.class.getName()); - - KafkaConsumer consumer = new KafkaConsumer<>(props); - // end::create[] - this.consumer = consumer; - } - - - public void run() - { - String id = "C"; - - try - { - log.info("{} - Subscribing to topic test", id); - consumer.subscribe(Arrays.asList("test")); - - // tag::loop[] - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - - // Do something with the data... - // end::loop[] - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - consumed++; - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - } - // tag::loop[] - } - // end::loop[] - } - catch(WakeupException e) - { - log.info("{} - RIIING!", id); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString()); - } - finally - { - this.lock.lock(); - try - { - log.info("{} - Closing the KafkaConsumer", id); - consumer.close(); - log.info("C - DONE!"); - stopped.signal(); - } - finally - { - this.lock.unlock(); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); - } - } - } - - - public static void main(String[] args) throws Exception - { - SimpleConsumer instance = new SimpleConsumer(); - - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - instance.lock.lock(); - try - { - instance.consumer.wakeup(); - instance.stopped.await(); - } - catch (InterruptedException e) - { - log.warn("Interrrupted while waiting for the consumer to stop!", e); - } - finally - { - instance.lock.unlock(); - } - })); - - instance.run(); - } -} diff --git a/src/main/java/de/juplo/kafka/SimpleProducer.java b/src/main/java/de/juplo/kafka/SimpleProducer.java deleted file mode 100644 index 43a7227..0000000 --- a/src/main/java/de/juplo/kafka/SimpleProducer.java +++ /dev/null @@ -1,117 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import java.util.Properties; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; - - -@Slf4j -public class SimpleProducer -{ - private final String id; - private final String topic; - private final KafkaProducer producer; - - private long produced = 0; - - public SimpleProducer(String clientId, String topic) - { - // tag::create[] - Properties props = new Properties(); - props.put("bootstrap.servers", "localhost:9092"); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - KafkaProducer producer = new KafkaProducer<>(props); - // end::create[] - - this.id = clientId; - this.topic = topic; - this.producer = producer; - } - - public void run() - { - long i = 0; - - try - { - for (; i < 100 ; i++) - { - send(Long.toString(i%10), Long.toString(i)); - } - - log.info("{} - Done", id); - } - finally - { - log.info("{}: Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } - } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - value, - record.key(), - now - time - ); - } - - - public static void main(String[] args) throws Exception - { - SimpleProducer producer = new SimpleProducer("P", "test"); - producer.run(); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..e4ae52a --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,15 @@ +producer: + bootstrap-server: :9092 + client-id: peter + topic: test + acks: 1 + throttle-ms: 1000 +management: + endpoints: + web: + exposure: + include: "*" +logging: + level: + root: INFO + de.juplo: DEBUG