From d1c1f40d2ad5f69e43ced0e278cf80dfc6ee21ef Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 31 Mar 2022 10:58:46 +0200 Subject: [PATCH] =?utf8?q?REST-Client=20implementiert,=20der=20Nachrichten?= =?utf8?q?=20=C3=BCber=20den=20REST-Producer=20verschickt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der Client versendet so wie der Endless-Producer eine monoton steigende Zahlenfolge * Allerdings werden alle Nachrichten mit dem selben Schlüssel versendet * Der Schlüssel wird über die Konfiguration festgelegt * Die Nachrichten werden asynchron versendet * Zwischen zwei Nachrichten wird eine konfigurierbare Pause eingehalten * Dabei kommen die Defaults von Spring Boot zum Einsatz (4 Http-Threads) * Die Antworten werden abgewartet und entsprechend ausgegeben * Über REST-Endpoints können die bisher aufgetretenen Fehler und die noch ausstehenden Nachrichten abgefragt werden --- README.sh | 42 +++- docker-compose.yml | 20 +- pom.xml | 8 +- src/main/java/de/juplo/kafka/Application.java | 27 +- .../de/juplo/kafka/ApplicationProperties.java | 8 +- .../java/de/juplo/kafka/DriverController.java | 13 +- .../java/de/juplo/kafka/EndlessProducer.java | 178 -------------- src/main/java/de/juplo/kafka/RestClient.java | 232 ++++++++++++++++++ src/main/java/de/juplo/kafka/RestFailure.java | 20 ++ src/main/java/de/juplo/kafka/RestResult.java | 8 + src/main/java/de/juplo/kafka/RestService.java | 39 +++ src/main/java/de/juplo/kafka/RestSuccess.java | 17 ++ src/main/resources/application.yml | 10 +- 13 files changed, 397 insertions(+), 225 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/EndlessProducer.java create mode 100644 src/main/java/de/juplo/kafka/RestClient.java create mode 100644 src/main/java/de/juplo/kafka/RestFailure.java create mode 100644 src/main/java/de/juplo/kafka/RestResult.java create mode 100644 src/main/java/de/juplo/kafka/RestService.java create mode 100644 src/main/java/de/juplo/kafka/RestSuccess.java diff --git a/README.sh b/README.sh index 73ceebc..9d08db6 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/rest-client:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -25,7 +25,39 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer -sleep 5 -docker-compose stop producer -docker-compose logs producer + +docker-compose up -d + +sleep 15 +http :8081/status +http :8000/seen + +http post :8081/stop +http :8081/status +http :8000/seen + +http post :8081/start + +sleep 1 +http :8081/status +http :8000/seen +sleep 1 +http :8081/status +http :8000/seen +sleep 1 +http :8081/status +http :8000/seen +sleep 1 +http :8081/status +http :8000/seen +sleep 1 +http :8081/status +http :8000/seen + +http post :8081/stop +http :8081/status + +http :8000/seen +http post :8000/stop + +docker-compose logs client diff --git a/docker-compose.yml b/docker-compose.yml index 10ad3a0..75a8562 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,11 +37,27 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/rest-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + + client: + image: juplo/rest-client:1.0-SNAPSHOT + ports: + - 8081:8081 + environment: + rest-client.base-url: http://producer:8080 + + consumer: + image: juplo/counting-consumer:1.0-SNAPSHOT + ports: + - 8000:8081 + environment: + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --git a/pom.xml b/pom.xml index 267e035..8bbb68c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,8 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + rest-client + REST Client: a simple client, that endlessly writes numbers via the REST-gateway 1.0-SNAPSHOT @@ -31,8 +31,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.boot + spring-boot-starter-webflux org.projectlombok diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bc617a8..e81851a 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,12 +1,11 @@ package de.juplo.kafka; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -14,31 +13,13 @@ import java.util.concurrent.Executors; @EnableConfigurationProperties(ApplicationProperties.class) public class Application { - @Autowired - ApplicationProperties properties; - - @Bean - public EndlessProducer producer() + public ExecutorService executor() { - Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set"); - Assert.hasText(properties.getClientId(), "producer.client-id must be set"); - Assert.hasText(properties.getTopic(), "producer.topic must be set"); - - EndlessProducer producer = - new EndlessProducer( - Executors.newFixedThreadPool(1), - properties.getBootstrapServer(), - properties.getClientId(), - properties.getTopic(), - properties.getAcks(), - properties.getThrottleMs()); - - producer.start(); - - return producer; + return Executors.newFixedThreadPool(1); } + public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ab26890..7aa84f7 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -4,14 +4,12 @@ import lombok.Getter; import lombok.Setter; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties(prefix = "producer") +@ConfigurationProperties(prefix = "rest-client") @Getter @Setter public class ApplicationProperties { - private String bootstrapServer; - private String clientId; - private String topic; - private String acks; + private String baseUrl; + private String username; private int throttleMs; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index b3af107..b2c4c72 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.ExceptionHandler; +import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; @@ -12,19 +13,25 @@ import java.util.concurrent.ExecutionException; @RequiredArgsConstructor public class DriverController { - private final EndlessProducer producer; + private final RestClient client; @PostMapping("start") public void start() { - producer.start(); + client.start(); } @PostMapping("stop") public void stop() throws ExecutionException, InterruptedException { - producer.stop(); + client.stop(); + } + + @GetMapping("status") + public RestClient.Status getStatus() + { + return client.getStatus(); } @ExceptionHandler diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java deleted file mode 100644 index 7a5b324..0000000 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ /dev/null @@ -1,178 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import javax.annotation.PreDestroy; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - - -@Slf4j -public class EndlessProducer implements Runnable -{ - private final ExecutorService executor; - private final String id; - private final String topic; - private final int throttleMs; - private final KafkaProducer producer; - - private boolean running = false; - private long i = 0; - private long produced = 0; - - public EndlessProducer( - ExecutorService executor, - String bootstrapServer, - String clientId, - String topic, - String acks, - int throttleMs) - { - this.executor = executor; - this.id = clientId; - this.topic = topic; - this.throttleMs = throttleMs; - - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("client.id", clientId); - props.put("acks", acks); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); - } - - @Override - public void run() - { - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - - if (throttleMs > 0) - { - try - { - Thread.sleep(throttleMs); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } - } - - log.info("{} - Done", id); - } - catch (Exception e) - { - log.error("{} - Unexpected Exception:", id, e); - } - finally - { - synchronized (this) - { - running = false; - log.info("{} - Stopped - produced {} messages so far", id, produced); - } - } - } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - value, - record.key(), - now - time - ); - } - - public synchronized void start() - { - if (running) - throw new IllegalStateException("Producer instance " + id + " is already running!"); - - log.info("{} - Starting - produced {} messages before", id, produced); - running = true; - executor.submit(this); - } - - public synchronized void stop() throws ExecutionException, InterruptedException - { - if (!running) - throw new IllegalStateException("Producer instance " + id + " is not running!"); - - log.info("{} - Stopping...", id); - running = false; - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - finally - { - log.info("{} - Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } - } -} diff --git a/src/main/java/de/juplo/kafka/RestClient.java b/src/main/java/de/juplo/kafka/RestClient.java new file mode 100644 index 0000000..689d672 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestClient.java @@ -0,0 +1,232 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + + +@Component +@Slf4j +public class RestClient implements Callable +{ + private final ExecutorService executor; + private final RestService service; + + private final String username; + private final int throttleMs; + private long i = 0; + + private boolean running = false; + private Future job; + + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private final Set pending = new TreeSet<>(); + private final Map offsets = new TreeMap<>(); + private final List failures = new LinkedList<>(); + + public RestClient( + ExecutorService executor, + RestService service, + ApplicationProperties properties) + { + this.executor = executor; + this.service = service; + this.username = properties.getUsername(); + this.throttleMs = properties.getThrottleMs(); + } + + + @Override + public Long call() + { + for(; running; i++) + { + Long message = i; + log.debug("{} - Sending message #{}", username, message); + guarded(() -> pending.add(message)); + service + .send(message) + .doOnSubscribe(subscription -> guarded(() -> pending.add(message))) + .doOnTerminate(() -> guarded(() -> + { + pending.remove(message); + condition.signal(); + })) + .onErrorResume(e -> + Mono.just( + RestFailure + .builder() + .error("client-error") + .exception(e.getMessage()) + .build())) + .subscribe(result -> + { + switch (result.getType()) + { + case SUCCESS: + RestSuccess success = (RestSuccess)result; + log.info( + "{} - Successfully sent message #{}: partition={}, offset={} ", + username, + message, + success.partition, + success.offset); + guarded(() -> + { + Long offset = offsets.get(success.partition); + if (offset == null || offset < success.offset) + offsets.put(success.partition, success.offset); + }); + break; + + case FAILURE: + RestFailure failure = (RestFailure)result; + log.warn( + "{} - Failure while sending message #{}: error={}, exception={}", + username, + message, + failure.error, + failure.exception); + guarded(() -> failures.add(failure)); + break; + } + }); + + if (throttleMs > 0) + { + try + { + Thread.sleep(throttleMs); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted while throttling!", username, e); + } + } + } + + return i; + } + + + public synchronized Status getStatus() + { + return new Status(running, pending, offsets, failures); + } + + + @Getter + public class Status + { + boolean running; + Set pending; + Map offsets; + List failures; + + private Status( + boolean running, + Set pending, + Map offsets, + List failures) + { + this.running = running; + guarded(() -> + { + this.pending = new LinkedHashSet<>(pending); + this.offsets = new LinkedHashMap<>(offsets); + this.failures = new ArrayList<>(failures); + }); + } + } + + + @PostConstruct + public synchronized void start() + { + if (running) + throw new IllegalStateException("REST-client " + username + " is already running!"); + + log.info("{} - Starting - {} messages sent before", username, i); + running = true; + job = executor.submit(this); + } + + public synchronized void stop() throws ExecutionException, InterruptedException + { + if (!running) + throw new IllegalStateException("REST-client " + username + " is not running!"); + + log.info("{} - Stopping...", username); + running = false; + Long sent = job.get(); + log.info("{} - Stopped - sent {} messages so far", username, sent); + } + + @PreDestroy + public synchronized void shutdown() + { + log.info("{} - Shutting down...", username); + try + { + stop(); + } + catch (Exception e) + { + log.warn("{} - Exception while stopping", username, e); + } + + guarded(() -> + { + while (!pending.isEmpty()) + { + log.debug("{} - Waiting for {} outstanding responses...", username, pending.size()); + try + { + condition.await(); + } + catch (InterruptedException e) + { + log.warn("{} - Interrupted wail awaiting condtion!", username, e); + } + } + }); + log.info("{} - Bye Bye", username); + } + + private void guarded(Runnable function) + { + lock.lock(); + try + { + function.run(); + } + finally + { + lock.unlock(); + } + } + + private T guarded(Supplier function) + { + lock.lock(); + try + { + return function.get(); + } + finally + { + lock.unlock(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/RestFailure.java b/src/main/java/de/juplo/kafka/RestFailure.java new file mode 100644 index 0000000..35bba9b --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestFailure.java @@ -0,0 +1,20 @@ +package de.juplo.kafka; + +import lombok.Builder; +import lombok.Data; + + +@Data +@Builder +public class RestFailure implements RestResult +{ + String error; + String exception; + Integer status; + + + public Type getType() + { + return Type.FAILURE; + } +} diff --git a/src/main/java/de/juplo/kafka/RestResult.java b/src/main/java/de/juplo/kafka/RestResult.java new file mode 100644 index 0000000..89716d8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestResult.java @@ -0,0 +1,8 @@ +package de.juplo.kafka; + +public interface RestResult +{ + public static enum Type { SUCCESS, FAILURE }; + + public Type getType(); +} diff --git a/src/main/java/de/juplo/kafka/RestService.java b/src/main/java/de/juplo/kafka/RestService.java new file mode 100644 index 0000000..fedb5f7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestService.java @@ -0,0 +1,39 @@ +package de.juplo.kafka; + +import org.springframework.http.MediaType; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; + + +@Service +public class RestService +{ + private final WebClient client; + private final String username; + + + public RestService( + WebClient.Builder builder, + ApplicationProperties properties) + { + this.client = builder + .baseUrl(properties.getBaseUrl()) + .build(); + this.username = properties.getUsername(); + } + + + public Mono send(long number) + { + return client + .post() + .uri("/{username}", username) + .bodyValue(Long.toString(number)) + .accept(MediaType.APPLICATION_JSON) + .exchangeToMono(response -> + response.statusCode().isError() + ? response.bodyToMono(RestFailure.class) + : response.bodyToMono(RestSuccess.class)); + } +} diff --git a/src/main/java/de/juplo/kafka/RestSuccess.java b/src/main/java/de/juplo/kafka/RestSuccess.java new file mode 100644 index 0000000..0c8690b --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestSuccess.java @@ -0,0 +1,17 @@ +package de.juplo.kafka; + +import lombok.Data; + + +@Data +public class RestSuccess implements RestResult +{ + Integer partition; + Long offset; + + + public Type getType() + { + return Type.SUCCESS; + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index e4ae52a..b2ef771 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,8 +1,8 @@ -producer: - bootstrap-server: :9092 - client-id: peter - topic: test - acks: 1 +server: + port: 8081 +rest-client: + base-url: http://localhost:8080 + username: rest-client throttle-ms: 1000 management: endpoints: -- 2.20.1