From: Kai Moritz Date: Sun, 6 Nov 2022 19:00:21 +0000 (+0100) Subject: Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet X-Git-Tag: rest-producer--json---lvm-2-tage--easy-path~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6369c42cebd818fda8c518813443eb907629ce41;p=demos%2Fkafka%2Ftraining Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet --- diff --git a/README.sh b/README.sh index 2649b38..52b5f20 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/rest-producer:1.0-SNAPSHOT +IMAGE=juplo/rest-producer-json:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -16,6 +16,7 @@ if [[ "$1" = "build" ]] then + docker-compose rm -svf producer mvn clean install || exit else echo "Using image existing images:" @@ -29,9 +30,11 @@ docker-compose up -d producer while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done -# tag::http[] -echo -n bar | http -v :8080/foo -echo -n foo | http -v :8080/bar X-id:666 +docker-compose up -d peter klaus + +sleep 10 +docker-compose stop peter klaus + # end::http[] # tag::kafkacat[] docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e diff --git a/docker-compose.yml b/docker-compose.yml index 47775e3..175eed5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -92,7 +92,7 @@ services: command: sleep infinity producer: - image: juplo/rest-producer:1.0-SNAPSHOT + image: juplo/rest-producer-json:1.0-SNAPSHOT ports: - 8080:8080 environment: @@ -102,44 +102,26 @@ services: producer.topic: test peter: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - server.port: 8080 - rest-client.baseUrl: http://producer:8080 - rest-client.username: peter - rest-client.throttle-ms: 1000 + image: juplo/toolbox + command: > + bash -c " + while [[ true ]]; + do + echo 777 | http -v producer:8080/peter; + sleep 1; + done + " klaus: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - server.port: 8080 - rest-client.baseUrl: http://producer:8080 - rest-client.username: klaus - rest-client.throttle-ms: 1100 - - beate: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - server.port: 8080 - rest-client.baseUrl: http://producer:8080 - rest-client.username: beate - rest-client.throttle-ms: 900 - - franz: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - server.port: 8080 - rest-client.baseUrl: http://producer:8080 - rest-client.username: franz - rest-client.throttle-ms: 800 - - uschi: - image: juplo/rest-client:1.0-SNAPSHOT - environment: - server.port: 8080 - rest-client.baseUrl: http://producer:8080 - rest-client.username: uschi - rest-client.throttle-ms: 1200 + image: juplo/toolbox + command: > + bash -c " + while [[ true ]]; + do + echo 666 | http -v producer:8080/klaus; + sleep 1; + done + " consumer: image: juplo/toolbox diff --git a/pom.xml b/pom.xml index e7ea677..7772b6c 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer + rest-producer-json REST Producer - A Simple Producer that takes messages via POST and confirms successs + A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder 1.0-SNAPSHOT @@ -36,8 +36,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -48,11 +48,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java new file mode 100644 index 0000000..88b5d6f --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class AddNumberMessage +{ + private final int number; + private final int next; +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0642aa4..9a11f6e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; @@ -16,7 +17,7 @@ public class ApplicationConfiguration @Bean public RestProducer restProducer( ApplicationProperties properties, - KafkaProducer kafkaProducer) + KafkaProducer kafkaProducer) { return new RestProducer( @@ -27,7 +28,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -39,7 +40,10 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", JsonSerializer.class.getName()); + props.put(JsonSerializer.TYPE_MAPPINGS, + "ADD:" + AddNumberMessage.class.getName() + "," + + "CALC:" + CalculateSumMessage.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java new file mode 100644 index 0000000..5d8c414 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CalculateSumMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class CalculateSumMessage +{ + private final int number; +} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java index 873a67b..b7785f1 100644 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -7,15 +7,20 @@ import lombok.Value; @Value public class ProduceFailure implements ProduceResult { - private final String error; - private final String exception; + private final String[] error; + private final String[] exception; private final Integer status; - public ProduceFailure(Exception e) + public ProduceFailure(Exception[] e) { status = 500; - exception = e.getClass().getSimpleName(); - error = e.getMessage(); + exception = new String[e.length]; + error = new String[e.length]; + for (int i = 0; i < e.length ; i++) + { + exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName(); + error[i] = e[i] == null ? null : e[i].getMessage(); + } } } diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java index 9c79e8b..8afe795 100644 --- a/src/main/java/de/juplo/kafka/ProduceSuccess.java +++ b/src/main/java/de/juplo/kafka/ProduceSuccess.java @@ -7,6 +7,6 @@ import lombok.Value; @Value public class ProduceSuccess implements ProduceResult { - Integer partition; - Long offset; + Integer[] partition; + Long[] offset; } diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index debe366..4be2dcd 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -20,7 +20,7 @@ public class RestProducer private final String id; private final String topic; private final Integer partition; - private final KafkaProducer producer; + private final KafkaProducer producer; private long produced = 0; @@ -28,13 +28,28 @@ public class RestProducer public DeferredResult send( @PathVariable String key, @RequestHeader(name = "X-id", required = false) Long correlationId, - @RequestBody String value) + @RequestBody Integer number) { - DeferredResult result = new DeferredResult<>(); + ResultRecorder result = new ResultRecorder(number+1); + for (int i = 1; i <= number; i++) + { + send(key, new AddNumberMessage(number, i), correlationId, result); + } + send(key, new CalculateSumMessage(number), correlationId, result); + + return result.getDeferredResult(); + } + + private void send( + String key, + Object value, + Long correlationId, + ResultRecorder result) + { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic partition, // Partition key, // Key @@ -53,8 +68,8 @@ public class RestProducer if (e == null) { // HANDLE SUCCESS + result.addSuccess(metadata); produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, @@ -69,7 +84,7 @@ public class RestProducer else { // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); + result.addFailure(e); log.error( "{} - ERROR key={} timestamp={} latency={}ms: {}", id, @@ -88,8 +103,6 @@ public class RestProducer record.key(), now - time ); - - return result; } @ExceptionHandler diff --git a/src/main/java/de/juplo/kafka/ResultRecorder.java b/src/main/java/de/juplo/kafka/ResultRecorder.java new file mode 100644 index 0000000..d20ee89 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ResultRecorder.java @@ -0,0 +1,74 @@ +package de.juplo.kafka; + +import lombok.Getter; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.Arrays; + + +class ResultRecorder +{ + @Getter + private final DeferredResult deferredResult = new DeferredResult(); + private final int numMessages; + private final RecordMetadata[] metadata; + private final Exception[] errors; + + private int sent = 0; + + + ResultRecorder(int numMessages) + { + this.numMessages = numMessages; + this.metadata = new RecordMetadata[numMessages]; + this.errors = new Exception[numMessages]; + } + + + void addSuccess(RecordMetadata m) + { + checkStatus(); + metadata[sent++] = m; + processResult(); + } + + void addFailure(Exception e) + { + checkStatus(); + errors[sent++] = e; + processResult(); + } + + private void checkStatus() throws IllegalStateException + { + if (sent >= numMessages) + throw new IllegalStateException("Already sent " + sent + " messages!"); + } + + private void processResult() + { + if (sent == numMessages) + { + if (Arrays + .stream(errors) + .filter(e -> e != null) + .findAny() + .isPresent()) + { + deferredResult.setErrorResult(new ProduceFailure(errors)); + } + else + { + Integer[] partitions = new Integer[numMessages]; + Long[] offsets = new Long[numMessages]; + for (int i = 0; i < numMessages; i++) + { + partitions[i] = metadata[i].partition(); + offsets[i] = metadata[i].offset(); + } + deferredResult.setResult(new ProduceSuccess(partitions, offsets)); + } + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0d5752c..8ce4821 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,7 +4,7 @@ producer: topic: test acks: -1 batch-size: 16384 - linger-ms: 0 + linger-ms: 5 compression-type: gzip management: endpoint: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 646a335..b9c1e17 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -8,6 +8,7 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; @@ -53,11 +54,15 @@ public class ApplicationTests void testSendMessage() throws Exception { mockMvc - .perform(post("/peter").content("Hallo Welt!")) + .perform( + post("/peter") + .header("X-id", 7) + .contentType(MediaType.APPLICATION_JSON) + .content("666")) .andExpect(status().isOk()); await("Message was send") .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); + .until(() -> consumer.received.size() == 667); }