From: Kai Moritz Date: Sat, 23 Jul 2022 09:27:34 +0000 (+0200) Subject: Merge der überarbeiteten Compose-Konfiguration ('endless-stream-producer') X-Git-Tag: customized---lvm-2-tage~1^2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=634e128465f67a397bf3cc1583e3b9289351becc;hp=26cef68053cc7472055344b171a44f34e7592ebb;p=demos%2Fkafka%2Ftraining Merge der überarbeiteten Compose-Konfiguration ('endless-stream-producer') --- diff --git a/README.sh b/README.sh index 73ceebc..698d6dd 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/endless-producer:1.0-SNAPSHOT +IMAGE=juplo/rest-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -25,7 +25,13 @@ fi echo "Waiting for the Kafka-Cluster to become ready..." docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d producer -sleep 5 -docker-compose stop producer +docker-compose up -d + +sleep 15 + +echo foo | http -v :8080/bar +dd if=/dev/zero bs=1024 count=1024 | http -v :8080/fehler +http -v :8081/seen + +docker-compose stop producer consumer docker-compose logs producer diff --git a/docker-compose.yml b/docker-compose.yml index a368379..a16bf67 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,7 +37,7 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/rest-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: @@ -45,4 +45,14 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 200 + + consumer: + image: juplo/endless-consumer:1.0-SNAPSHOT + ports: + - 8081:8080 + environment: + server.port: 8080 + consumer.bootstrap-server: kafka:9092 + consumer.client-id: my-group + consumer.client-id: consumer + consumer.topic: test diff --git a/pom.xml b/pom.xml index af4a89f..e4d24bb 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,9 @@ de.juplo.kafka - endless-producer - Endless Producer: a Simple Producer that endlessly writes numbers into a topic + rest-producer + REST Producer + A Simple Producer that takes messages via POST and confirms successs 1.0-SNAPSHOT @@ -43,6 +44,21 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index bc617a8..9f3e3ed 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -14,31 +14,6 @@ import java.util.concurrent.Executors; @EnableConfigurationProperties(ApplicationProperties.class) public class Application { - @Autowired - ApplicationProperties properties; - - - @Bean - public EndlessProducer producer() - { - Assert.hasText(properties.getBootstrapServer(), "producer.bootstrap-server must be set"); - Assert.hasText(properties.getClientId(), "producer.client-id must be set"); - Assert.hasText(properties.getTopic(), "producer.topic must be set"); - - EndlessProducer producer = - new EndlessProducer( - Executors.newFixedThreadPool(1), - properties.getBootstrapServer(), - properties.getClientId(), - properties.getTopic(), - properties.getAcks(), - properties.getThrottleMs()); - - producer.start(); - - return producer; - } - public static void main(String[] args) { SpringApplication.run(Application.class, args); diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index ab26890..1f30262 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -13,5 +13,7 @@ public class ApplicationProperties private String clientId; private String topic; private String acks; - private int throttleMs; + private Integer batchSize; + private Integer lingerMs; + private String compressionType; } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java deleted file mode 100644 index ce4df68..0000000 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ /dev/null @@ -1,38 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; - -import java.util.concurrent.ExecutionException; - - -@RestController -@RequiredArgsConstructor -public class DriverController -{ - private final EndlessProducer producer; - - - @PostMapping("start") - public void start() - { - producer.start(); - } - - @PostMapping("stop") - public void stop() throws ExecutionException, InterruptedException - { - producer.stop(); - } - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } -} diff --git a/src/main/java/de/juplo/kafka/EndlessProducer.java b/src/main/java/de/juplo/kafka/EndlessProducer.java deleted file mode 100644 index 7a5b324..0000000 --- a/src/main/java/de/juplo/kafka/EndlessProducer.java +++ /dev/null @@ -1,178 +0,0 @@ -package de.juplo.kafka; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; - -import javax.annotation.PreDestroy; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; - - -@Slf4j -public class EndlessProducer implements Runnable -{ - private final ExecutorService executor; - private final String id; - private final String topic; - private final int throttleMs; - private final KafkaProducer producer; - - private boolean running = false; - private long i = 0; - private long produced = 0; - - public EndlessProducer( - ExecutorService executor, - String bootstrapServer, - String clientId, - String topic, - String acks, - int throttleMs) - { - this.executor = executor; - this.id = clientId; - this.topic = topic; - this.throttleMs = throttleMs; - - Properties props = new Properties(); - props.put("bootstrap.servers", bootstrapServer); - props.put("client.id", clientId); - props.put("acks", acks); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); - } - - @Override - public void run() - { - try - { - for (; running; i++) - { - send(Long.toString(i%10), Long.toString(i)); - - if (throttleMs > 0) - { - try - { - Thread.sleep(throttleMs); - } - catch (InterruptedException e) - { - log.warn("{} - Interrupted while throttling!", e); - } - } - } - - log.info("{} - Done", id); - } - catch (Exception e) - { - log.error("{} - Unexpected Exception:", id, e); - } - finally - { - synchronized (this) - { - running = false; - log.info("{} - Stopped - produced {} messages so far", id, produced); - } - } - } - - void send(String key, String value) - { - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued #{} key={} latency={}ms", - id, - value, - record.key(), - now - time - ); - } - - public synchronized void start() - { - if (running) - throw new IllegalStateException("Producer instance " + id + " is already running!"); - - log.info("{} - Starting - produced {} messages before", id, produced); - running = true; - executor.submit(this); - } - - public synchronized void stop() throws ExecutionException, InterruptedException - { - if (!running) - throw new IllegalStateException("Producer instance " + id + " is not running!"); - - log.info("{} - Stopping...", id); - running = false; - } - - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException - { - log.info("{} - Destroy!", id); - try - { - stop(); - } - catch (IllegalStateException e) - { - log.info("{} - Was already stopped", id); - } - finally - { - log.info("{} - Closing the KafkaProducer", id); - producer.close(); - log.info("{}: Produced {} messages in total, exiting!", id, produced); - } - } -} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java new file mode 100644 index 0000000..873a67b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -0,0 +1,21 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class ProduceFailure implements ProduceResult +{ + private final String error; + private final String exception; + private final Integer status; + + + public ProduceFailure(Exception e) + { + status = 500; + exception = e.getClass().getSimpleName(); + error = e.getMessage(); + } +} diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java new file mode 100644 index 0000000..ceff329 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceResult.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import com.fasterxml.jackson.annotation.JsonInclude; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + + +@JsonInclude(NON_NULL) +public interface ProduceResult +{ +} diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java new file mode 100644 index 0000000..9c79e8b --- /dev/null +++ b/src/main/java/de/juplo/kafka/ProduceSuccess.java @@ -0,0 +1,12 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class ProduceSuccess implements ProduceResult +{ + Integer partition; + Long offset; +} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java new file mode 100644 index 0000000..7d9bf12 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -0,0 +1,124 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.annotation.PreDestroy; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; + + +@Slf4j +@RestController +public class RestProducer +{ + private final String id; + private final String topic; + private final KafkaProducer producer; + + private long produced = 0; + + public RestProducer(ApplicationProperties properties) + { + this.id = properties.getClientId(); + this.topic = properties.getTopic(); + + Properties props = new Properties(); + props.put("bootstrap.servers", properties.getBootstrapServer()); + props.put("client.id", properties.getClientId()); + props.put("acks", properties.getAcks()); + props.put("batch.size", properties.getBatchSize()); + props.put("delivery.timeout.ms", 20000); // 20 Sekunden + props.put("request.timeout.ms", 10000); // 10 Sekunden + props.put("linger.ms", properties.getLingerMs()); + props.put("compression.type", properties.getCompressionType()); + props.put("key.serializer", StringSerializer.class.getName()); + props.put("value.serializer", StringSerializer.class.getName()); + + this.producer = new KafkaProducer<>(props); + } + + @PostMapping(path = "{key}") + public DeferredResult send( + @PathVariable String key, + @RequestBody String value) + { + DeferredResult result = new DeferredResult<>(); + + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + key, // Key + value // Value + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + produced++; + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued #{} key={} latency={}ms", + id, + value, + record.key(), + now - time + ); + + return result; + } + + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ErrorResponse illegalStateException(IllegalStateException e) + { + return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); + } + + @PreDestroy + public void destroy() throws ExecutionException, InterruptedException + { + log.info("{} - Destroy!", id); + log.info("{} - Closing the KafkaProducer", id); + producer.close(); + log.info("{}: Produced {} messages in total, exiting!", id, produced); + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7dd385b..726204e 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,8 +2,10 @@ producer: bootstrap-server: :9092 client-id: DEV topic: test - acks: 1 - throttle-ms: 1000 + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: @@ -23,7 +25,9 @@ info: client-id: ${producer.client-id} topic: ${producer.topic} acks: ${producer.acks} - throttle-ms: ${producer.throttle-ms} + batch-size: ${producer.batch-size} + linger-ms: ${producer.linger-ms} + compression-type: ${producer.compression-type} logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java new file mode 100644 index 0000000..cf70c81 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -0,0 +1,86 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; + +import java.time.Duration; +import java.util.LinkedList; +import java.util.List; + +import static de.juplo.kafka.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.ApplicationTests.TOPIC; +import static org.awaitility.Awaitility.*; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", + "producer.bootstrap-server=${spring.embedded.kafka.brokers}", + "producer.topic=" + TOPIC}) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) +@Slf4j +public class ApplicationTests +{ + static final String TOPIC = "FOO"; + static final int PARTITIONS = 10; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + + @BeforeEach + public void clear() + { + consumer.received.clear(); + } + + + @Test + void testSendMessage() throws Exception + { + mockMvc + .perform(post("/peter").content("Hallo Welt!")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } + + + static class Consumer + { + final List> received = new LinkedList<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC) + public void receive(ConsumerRecord record) + { + log.debug("Received message: {}", record); + received.add(record); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +}