From 50da4f6f74a8f4f567b7af8556480f81256c61dc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 25 Mar 2022 10:56:17 +0100 Subject: [PATCH] EndlessProducer in RestProducer umgearbeitet MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Producer nimmt die zu versendende Nachricht über ein POST entgegen * Als Schlüssel wird der Pfad des POST-Aufrufs verwendet * Die Anfragen werden mit einem DeferredResult asynchron verarbeitet * Der Producer antwortet erst mit 200-OK, wenn die Nachricht bestätigt wurde * Wenn der Broker mit einem Fehler antwortet, wird 500 zurückgegeben * Wenn der Broker nicht erreicht werden kann, wird 400 zurückgegeben --- README.sh | 76 +++++++++++- docker-compose.yml | 63 +++++++++- pom.xml | 4 +- src/main/java/de/juplo/kafka/Application.java | 25 ---- .../de/juplo/kafka/ApplicationProperties.java | 4 +- .../java/de/juplo/kafka/DriverController.java | 35 ------ .../java/de/juplo/kafka/ProduceFailure.java | 21 ++++ .../java/de/juplo/kafka/ProduceResult.java | 11 ++ .../java/de/juplo/kafka/ProduceSuccess.java | 12 ++ .../java/de/juplo/kafka/RestProducer.java | 114 ++++-------------- src/main/resources/application.yml | 6 +- 11 files changed, 211 insertions(+), 160 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/DriverController.java create mode 100644 src/main/java/de/juplo/kafka/ProduceFailure.java create mode 100644 src/main/java/de/juplo/kafka/ProduceResult.java create mode 100644 src/main/java/de/juplo/kafka/ProduceSuccess.java diff --git a/README.sh b/README.sh index 73ceebc..8b568c9 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/rest-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -25,7 +25,75 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer -sleep 5 -docker-compose stop producer + +docker-compose up -d + +sleep 15 +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen + +http post :8081/stop +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen + +http post :8081/start + +sleep 1 +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen +sleep 1 +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen +sleep 1 +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen +sleep 1 +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen +sleep 1 +http :8081/status +http :8082/status +http :8083/status +http :8084/status +http :8085/status +http :8000/seen + +http post :8081/stop +http :8081/status +http post :8082/stop +http :8082/status +http post :8083/stop +http :8083/status +http post :8084/stop +http :8084/status +http post :8085/stop +http :8085/status + +http :8000/seen +http post :8000/stop + docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index 10ad3a0..62b72f5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,11 +37,70 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/rest-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + + peter: + image: juplo/rest-client:1.0-SNAPSHOT + ports: + - 8081:8080 + environment: + server.port: 8080 + rest-client.baseUrl: http://producer:8080 + rest-client.username: peter + rest-client.throttle-ms: 1000 + + klaus: + image: juplo/rest-client:1.0-SNAPSHOT + ports: + - 8082:8080 + environment: + server.port: 8080 + rest-client.baseUrl: http://producer:8080 + rest-client.username: klaus + rest-client.throttle-ms: 1100 + + beate: + image: juplo/rest-client:1.0-SNAPSHOT + ports: + - 8083:8080 + environment: + server.port: 8080 + rest-client.baseUrl: http://producer:8080 + rest-client.username: beate + rest-client.throttle-ms: 900 + + franz: + image: juplo/rest-client:1.0-SNAPSHOT + ports: + - 8084:8080 + environment: + server.port: 8080 + rest-client.baseUrl: http://producer:8080 + rest-client.username: franz + rest-client.throttle-ms: 800 + + uschi: + image: juplo/rest-client:1.0-SNAPSHOT + ports: + - 8085:8080 + environment: + server.port: 8080 + rest-client.baseUrl: http://producer:8080 + rest-client.username: uschi + rest-client.throttle-ms: 1200 + + consumer: + image: juplo/counting-consumer:1.0-SNAPSHOT + ports: + - 8000:8081 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --git a/pom.xml b/pom.xml index 267e035..129ea94 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + rest-producer + REST Producer: a Simple Producer that takes messages via POST and confirms successs 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bc617a8..9f3e3ed 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -14,31 +14,6 @@ import java.util.concurrent.Executors; @EnableConfigurationProperties(ApplicationProperties.class) public class Application { - @Autowired - ApplicationProperties properties; - - - @Bean - public EndlessProducer producer() - { - Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set"); - Assert.hasText(properties.getClientId(), "producer.client-id must be set"); - Assert.hasText(properties.getTopic(), "producer.topic must be set"); - - EndlessProducer producer = - new EndlessProducer( - Executors.newFixedThreadPool(1), - properties.getBootstrapServer(), - properties.getClientId(), - properties.getTopic(), - properties.getAcks(), - properties.getThrottleMs()); - - producer.start(); - - return producer; - } - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ab26890..1f30262 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -13,5 +13,7 @@ public class ApplicationProperties private String clientId; private String topic; private String acks; - private int throttleMs; + private Integer batchSize; + private Integer lingerMs; + private String compressionType; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java deleted file mode 100644 index b3af107..0000000 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ /dev/null @@ -1,35 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.concurrent.ExecutionException; - - -@RestController -@RequiredArgsConstructor -public class DriverController -{ - private final EndlessProducer producer; - - - @PostMapping("start") - public void start() - { - producer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - producer.stop(); - } - - @ExceptionHandler - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), 400); - } -} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java new file mode 100644 index 0000000..873a67b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class ProduceFailure implements ProduceResult +{ + private final String error; + private final String exception; + private final Integer status; + + + public ProduceFailure(Exception e) + { + status = 500; + exception = e.getClass().getSimpleName(); + error = e.getMessage(); + } +} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java new file mode 100644 index 0000000..ceff329 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceResult.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + + +@JsonInclude(NON_NULL) +public interface ProduceResult +{ +} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java new file mode 100644 index 0000000..9c79e8b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceSuccess.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class ProduceSuccess implements ProduceResult +{ + Integer partition; + Long offset; +} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 7a5b324..19f70ba 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -4,6 +4,9 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PreDestroy; import java.util.Properties; @@ -12,81 +15,40 @@ import java.util.concurrent.ExecutorService; @Slf4j -public class EndlessProducer implements Runnable +@RestController +public class RestProducer { - private final ExecutorService executor; private final String id; private final String topic; - private final int throttleMs; private final KafkaProducer producer; - private boolean running = false; - private long i = 0; private long produced = 0; - public EndlessProducer( - ExecutorService executor, - String bootstrapServer, - String clientId, - String topic, - String acks, - int throttleMs) + public RestProducer(ApplicationProperties properties) { - this.executor = executor; - this.id = clientId; - this.topic = topic; - this.throttleMs = throttleMs; + this.id = properties.getClientId(); + this.topic = properties.getTopic(); Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("client.id", clientId); - props.put("acks", acks); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getAcks()); + props.put("batch.size", properties.getBatchSize()); + props.put("linger.ms", properties.getLingerMs()); + props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); this.producer = new KafkaProducer<>(props); } - @Override - public void run() + @PostMapping(path = "{key}") + public DeferredResult send( + @PathVariable String key, + @RequestBody String value) { - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - - if (throttleMs > 0) - { - try - { - Thread.sleep(throttleMs); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } - } + DeferredResult result = new DeferredResult<>(); - log.info("{} - Done", id); - } - catch (Exception e) - { - log.error("{} - Unexpected Exception:", id, e); - } - finally - { - synchronized (this) - { - running = false; - log.info("{} - Stopped - produced {} messages so far", id, produced); - } - } - } - - void send(String key, String value) - { final long time = System.currentTimeMillis(); final ProducerRecord record = new ProducerRecord<>( @@ -102,6 +64,7 @@ public class EndlessProducer implements Runnable { // HANDLE SUCCESS produced++; + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, @@ -116,6 +79,7 @@ public class EndlessProducer implements Runnable else { // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); log.error( "{} - ERROR key={} timestamp={} latency={}ms: {}", id, @@ -135,44 +99,16 @@ public class EndlessProducer implements Runnable record.key(), now - time ); - } - - public synchronized void start() - { - if (running) - throw new IllegalStateException("Producer instance " + id + " is already running!"); - - log.info("{} - Starting - produced {} messages before", id, produced); - running = true; - executor.submit(this); - } - public synchronized void stop() throws ExecutionException, InterruptedException - { - if (!running) - throw new IllegalStateException("Producer instance " + id + " is not running!"); - - log.info("{} - Stopping...", id); - running = false; + return result; } @PreDestroy public void destroy() throws ExecutionException, InterruptedException { log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - finally - { - log.info("{} - Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } + log.info("{} - Closing the KafkaProducer", id); + producer.close(); + log.info("{}: Produced {} messages in total, exiting!", id, produced); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e4ae52a..fcc0f3c 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,8 +2,10 @@ producer: bootstrap-server: :9092 client-id: peter topic: test - acks: 1 - throttle-ms: 1000 + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoints: web: -- 2.20.1