From: Kai Moritz Date: Fri, 22 Jul 2022 18:57:49 +0000 (+0200) Subject: Merge der Upgrades für Confluent/Spring-Boot (Branch 'rest-producer') X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=dcd9037cade37c58bf38536b01beb8f69a7a6a12;hp=96998bebe231ac0e61ef6521a2c23694e81cfd86;p=demos%2Fkafka%2Ftraining Merge der Upgrades für Confluent/Spring-Boot (Branch 'rest-producer') --- diff --git a/README.sh b/README.sh index 698d6dd..832a42f 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/rest-producer:1.0-SNAPSHOT +IMAGE=juplo/springified-producer:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -29,7 +29,9 @@ docker-compose up -d sleep 15 -echo foo | http -v :8080/bar +echo 'Hallo Welt!' | http -v :8080/peter +echo peter | http -v :8080/ +http -v PUT :8080/peter dd if=/dev/zero bs=1024 count=1024 | http -v :8080/fehler http -v :8081/seen diff --git a/docker-compose.yml b/docker-compose.yml index 1e8a8df..7321516 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,13 +37,13 @@ services: command: sleep infinity producer: - image: juplo/rest-producer:1.0-SNAPSHOT + image: juplo/springified-producer:1.0-SNAPSHOT ports: - 8080:8880 environment: - producer.bootstrap-server: kafka:9092 + spring.kafka.bootstrap-servers: kafka:9092 producer.client-id: producer - producer.topic: test + spring.kafka.template.default-topic: test consumer: image: juplo/endless-consumer:1.0-SNAPSHOT diff --git a/pom.xml b/pom.xml index e4d24bb..4c79bc8 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs + springified-producer + Springified REST Producer + A Simple Producer that is implemented with the help of Spring Kafka and takes messages via POST and confirms successs 1.0-SNAPSHOT @@ -32,8 +32,8 @@ true - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -44,11 +44,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 9f3e3ed..273cee5 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,13 +1,8 @@ package de.juplo.kafka; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.concurrent.Executors; @SpringBootApplication diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 1f30262..7b01334 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -9,11 +9,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @Setter public class ApplicationProperties { - private String bootstrapServer; private String clientId; private String topic; - private String acks; - private Integer batchSize; - private Integer lingerMs; - private String compressionType; } diff --git a/src/main/java/de/juplo/kafka/ClientMessage.java b/src/main/java/de/juplo/kafka/ClientMessage.java new file mode 100644 index 0000000..042fdc4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ClientMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class ClientMessage +{ + private final String client; + private final String message; +} diff --git a/src/main/java/de/juplo/kafka/FooMessage.java b/src/main/java/de/juplo/kafka/FooMessage.java new file mode 100644 index 0000000..2e9e8ba --- /dev/null +++ b/src/main/java/de/juplo/kafka/FooMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class FooMessage +{ + private final String client; + private final Long timestamp; +} diff --git a/src/main/java/de/juplo/kafka/Greeting.java b/src/main/java/de/juplo/kafka/Greeting.java new file mode 100644 index 0000000..2df15a3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/Greeting.java @@ -0,0 +1,13 @@ +package de.juplo.kafka; + +import lombok.Value; + +import java.time.LocalDateTime; + + +@Value +public class Greeting +{ + private final String name; + private final LocalDateTime when = LocalDateTime.now(); +} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java index 873a67b..7c78482 100644 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -12,7 +12,7 @@ public class ProduceFailure implements ProduceResult private final Integer status; - public ProduceFailure(Exception e) + public ProduceFailure(Throwable e) { status = 500; exception = e.getClass().getSimpleName(); diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 7d9bf12..423a8a3 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -1,18 +1,14 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.clients.producer.RecordMetadata; import org.springframework.http.HttpStatus; -import org.springframework.http.MediaType; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.PreDestroy; -import java.util.Properties; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; @Slf4j @@ -20,86 +16,88 @@ import java.util.concurrent.ExecutorService; public class RestProducer { private final String id; - private final String topic; - private final KafkaProducer producer; + private final KafkaTemplate kafkaTemplate; private long produced = 0; - public RestProducer(ApplicationProperties properties) + public RestProducer( + ApplicationProperties properties, + KafkaTemplate kafkaTemplate) { this.id = properties.getClientId(); - this.topic = properties.getTopic(); - - Properties props = new Properties(); - props.put("bootstrap.servers", properties.getBootstrapServer()); - props.put("client.id", properties.getClientId()); - props.put("acks", properties.getAcks()); - props.put("batch.size", properties.getBatchSize()); - props.put("delivery.timeout.ms", 20000); // 20 Sekunden - props.put("request.timeout.ms", 10000); // 10 Sekunden - props.put("linger.ms", properties.getLingerMs()); - props.put("compression.type", properties.getCompressionType()); - props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); - - this.producer = new KafkaProducer<>(props); + this.kafkaTemplate = kafkaTemplate; } @PostMapping(path = "{key}") - public DeferredResult send( + public DeferredResult message( @PathVariable String key, @RequestBody String value) + { + key = key.trim(); + return send(key, new ClientMessage(key, value)); + } + + @PutMapping(path = "{key}") + public DeferredResult message(@PathVariable String key) + { + key = key.trim(); + return send(key, new FooMessage(key, System.currentTimeMillis())); + } + + @PostMapping(path = "/") + public DeferredResult greeting( + @RequestBody String name) + { + name = name.trim(); + return send(name, new Greeting(name)); + } + + private DeferredResult send(String key, Object value) { DeferredResult result = new DeferredResult<>(); final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) + kafkaTemplate.sendDefault(key, value).addCallback( + (sendResult) -> { + long now = System.currentTimeMillis(); + // HANDLE SUCCESS + RecordMetadata metadata = sendResult.getRecordMetadata(); produced++; result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, - record.key(), - record.value(), + key, + value, metadata.partition(), metadata.offset(), metadata.timestamp(), now - time ); - } - else + }, + (e) -> { + long now = System.currentTimeMillis(); + // HANDLE ERROR result.setErrorResult(new ProduceFailure(e)); log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", + "{} - ERROR key={} timestamp=-1 latency={}ms: {}", id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), + key, now - time, e.toString() ); - } - }); + }); long now = System.currentTimeMillis(); log.trace( - "{} - Queued #{} key={} latency={}ms", + "{} - Queued key={} latency={}ms", id, - value, - record.key(), + key, now - time ); @@ -117,8 +115,6 @@ public class RestProducer public void destroy() throws ExecutionException, InterruptedException { log.info("{} - Destroy!", id); - log.info("{} - Closing the KafkaProducer", id); - producer.close(); log.info("{}: Produced {} messages in total, exiting!", id, produced); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 726204e..ce26258 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,6 @@ producer: - bootstrap-server: :9092 client-id: DEV topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip management: endpoint: shutdown: @@ -21,13 +16,32 @@ management: enabled: true info: kafka: - bootstrap-server: ${producer.bootstrap-server} + bootstrap-servers: ${spring.kafka.bootstrap-servers} client-id: ${producer.client-id} - topic: ${producer.topic} - acks: ${producer.acks} - batch-size: ${producer.batch-size} - linger-ms: ${producer.linger-ms} - compression-type: ${producer.compression-type} + topic: ${spring.kafka.template.default-topic} + acks: ${spring.kafka.producer.acks} + batch-size: ${spring.kafka.producer.batch-size} + linger-ms: ${spring.kafka.producer.properties.linger.ms} + compression-type: ${spring.kafka.producer.compression-type} +spring: + kafka: + bootstrap-servers: :9092 + producer: + acks: -1 + batch-size: 16384 + compression-type: gzip + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + properties: + linger.ms: 0 + delivery.timeout.ms: 20000 # 20 Sekunden + request.timeout.ms: 10000 # 10 Sekunden + spring.json.type.mapping: > + message:de.juplo.kafka.ClientMessage, + foo:de.juplo.kafka.FooMessage, + greeting:de.juplo.kafka.Greeting + template: + default-topic: test logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index cf70c81..0a11e44 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -26,9 +26,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { - "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.bootstrap-server=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.template.default-topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j @@ -51,7 +50,7 @@ public class ApplicationTests @Test - void testSendMessage() throws Exception + void testSendClientMessage() throws Exception { mockMvc .perform(post("/peter").content("Hallo Welt!")) @@ -61,6 +60,28 @@ public class ApplicationTests .until(() -> consumer.received.size() == 1); } + @Test + void testSendFooMessage() throws Exception + { + mockMvc + .perform(put("/peter")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } + + @Test + void testSendGreeting() throws Exception + { + mockMvc + .perform(post("/").content("peter")) + .andExpect(status().isOk()); + await("Message was send") + .atMost(Duration.ofSeconds(5)) + .until(() -> consumer.received.size() == 1); + } + static class Consumer {