From: Kai Moritz Date: Sun, 6 Nov 2022 19:00:21 +0000 (+0100) Subject: Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=225b80e3ac65627a82ad9fdccc305b63d7eee0fd;p=demos%2Fkafka%2Ftraining Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet --- diff --git a/README.sh b/README.sh index f227f47..af458b9 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/rest-producer:1.0-SNAPSHOT +IMAGE=juplo/rest-producer--json:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -17,6 +17,7 @@ if [[ "$1" = "build" ]] then + docker-compose rm -svf producer mvn clean install || exit else echo "Using image existing images:" @@ -33,91 +34,11 @@ docker-compose -f docker/docker-compose.yml logs setup docker-compose -f docker/docker-compose.yml up -d producer while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done -# tag::hashed[] -echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus -# end::hashed[] -echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus -# tag::hashed[] -echo -n Nachricht 1 an peter über producer | http -v :8080/peter -# end::hashed[] -echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 2 an peter über producer | http -v :8080/peter -echo -n Nachricht 3 an peter über producer | http -v :8080/peter +docker-compose -f docker/docker-compose.yml up -d peter klaus -echo Nachrichten in Partition 0: -kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo -echo Nachrichten in Partition 1: -kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo +sleep 10 +docker-compose -f docker/docker-compose.yml stop peter klaus -docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF' -echo "Altering number of partitions from 2 to 3..." -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -# tag::repartitioning[] -kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3 -# end::repartitioning[] -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -EOF - -docker-compose -f docker/docker-compose.yml restart producer -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done - -echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 5 an peter über producer | http -v :8080/peter -echo -n Nachricht 4 an peter über producer | http -v :8080/peter -echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus -echo -n Nachricht 6 an peter über producer | http -v :8080/peter - -echo Nachrichten in Partition 0: # tag::kafkacat[] -kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe +kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n' # end::kafkacat[] -echo -echo Nachrichten in Partition 1: -# tag::kafkacat[] -kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -# end::kafkacat[] -echo -echo Nachrichten in Partition 2: -kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe - - -docker-compose -f docker/docker-compose.yml restart setup -sleep 1 -docker-compose -f docker/docker-compose.yml up -d producer-0 producer-1 -while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done -while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done - -# tag::fixed[] -echo -n Nachricht 1 über producer-0 | http -v :8000/klaus -echo -n Nachricht 1 über producer-1 | http -v :8001/klaus -echo -n Nachricht 2 über producer-0 | http -v :8000/peter -echo -n Nachricht 2 über producer-1 | http -v :8001/peter -# end::fixed[] - -docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF' -echo "Altering number of partitions from 2 to 3..." -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3 -kafka-topics --bootstrap-server kafka:9092 --describe --topic test -EOF - -docker-compose -f docker/docker-compose.yml restart producer-0 producer-1 -while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done -while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done - -echo -n Nachricht 3 über producer-0 | http -v :8000/klaus -echo -n Nachricht 3 über producer-1 | http -v :8001/klaus -echo -n Nachricht 4 über producer-0 | http -v :8000/peter -echo -n Nachricht 4 über producer-1 | http -v :8001/peter - -echo Nachrichten in Partition 0: -kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo -echo Nachrichten in Partition 1: -kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe -echo -echo Nachrichten in Partition 2: -kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 521eec1..e2b3c43 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -100,7 +100,7 @@ services: - setup producer: - image: juplo/rest-producer:1.0-SNAPSHOT + image: juplo/rest-producer--json:1.0-SNAPSHOT ports: - 8080:8080 environment: @@ -109,36 +109,28 @@ services: producer.client-id: producer producer.topic: test - producer-0: - image: juplo/rest-producer:1.0-SNAPSHOT - ports: - - 8000:8080 - environment: - server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer-0 - producer.topic: test - producer.partition: 0 - - producer-1: - image: juplo/rest-producer:1.0-SNAPSHOT - ports: - - 8001:8080 - environment: - server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer-1 - producer.topic: test - producer.partition: 1 - - consumer-1: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer-1 + peter: + image: juplo/toolbox + command: > + bash -c " + while [[ true ]]; + do + echo 777 | http -v producer:8080/peter; + sleep 1; + done + " - consumer-2: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer-2 + klaus: + image: juplo/toolbox + command: > + bash -c " + while [[ true ]]; + do + echo 666 | http -v producer:8080/klaus; + sleep 1; + done + " - consumer-3: - image: juplo/simple-consumer:1.0-SNAPSHOT - command: kafka:9092 test my-group consumer-3 + consumer: + image: juplo/toolbox + command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' diff --git a/pom.xml b/pom.xml index 7d977a6..a7734ec 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer + rest-producer--json REST Producer - A Simple Producer that takes messages via POST and confirms successs + A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder 1.0-SNAPSHOT @@ -40,8 +40,8 @@ spring-boot-starter-validation - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.projectlombok @@ -52,11 +52,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java new file mode 100644 index 0000000..88b5d6f --- /dev/null +++ b/src/main/java/de/juplo/kafka/AddNumberMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +import lombok.Value; + + +@Value +public class AddNumberMessage +{ + private final int number; + private final int next; +} diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0642aa4..9a11f6e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; @@ -16,7 +17,7 @@ public class ApplicationConfiguration @Bean public RestProducer restProducer( ApplicationProperties properties, - KafkaProducer kafkaProducer) + KafkaProducer kafkaProducer) { return new RestProducer( @@ -27,7 +28,7 @@ public class ApplicationConfiguration } @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -39,7 +40,10 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", JsonSerializer.class.getName()); + props.put(JsonSerializer.TYPE_MAPPINGS, + "ADD:" + AddNumberMessage.class.getName() + "," + + "CALC:" + CalculateSumMessage.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java new file mode 100644 index 0000000..5d8c414 --- /dev/null +++ b/src/main/java/de/juplo/kafka/CalculateSumMessage.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + + +import lombok.Value; + + +@Value +public class CalculateSumMessage +{ + private final int number; +} diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java index 873a67b..b7785f1 100644 --- a/src/main/java/de/juplo/kafka/ProduceFailure.java +++ b/src/main/java/de/juplo/kafka/ProduceFailure.java @@ -7,15 +7,20 @@ import lombok.Value; @Value public class ProduceFailure implements ProduceResult { - private final String error; - private final String exception; + private final String[] error; + private final String[] exception; private final Integer status; - public ProduceFailure(Exception e) + public ProduceFailure(Exception[] e) { status = 500; - exception = e.getClass().getSimpleName(); - error = e.getMessage(); + exception = new String[e.length]; + error = new String[e.length]; + for (int i = 0; i < e.length ; i++) + { + exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName(); + error[i] = e[i] == null ? null : e[i].getMessage(); + } } } diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java index 9c79e8b..8afe795 100644 --- a/src/main/java/de/juplo/kafka/ProduceSuccess.java +++ b/src/main/java/de/juplo/kafka/ProduceSuccess.java @@ -7,6 +7,6 @@ import lombok.Value; @Value public class ProduceSuccess implements ProduceResult { - Integer partition; - Long offset; + Integer[] partition; + Long[] offset; } diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 73bec5b..0158774 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -21,7 +21,7 @@ public class RestProducer private final String id; private final String topic; private final Integer partition; - private final Producer producer; + private final Producer producer; private long produced = 0; @@ -29,13 +29,28 @@ public class RestProducer public DeferredResult send( @PathVariable String key, @RequestHeader(name = "X-id", required = false) Long correlationId, - @RequestBody String value) + @RequestBody Integer number) { - DeferredResult result = new DeferredResult<>(); + ResultRecorder result = new ResultRecorder(number+1); + for (int i = 1; i <= number; i++) + { + send(key, new AddNumberMessage(number, i), correlationId, result); + } + send(key, new CalculateSumMessage(number), correlationId, result); + + return result.getDeferredResult(); + } + + private void send( + String key, + Object value, + Long correlationId, + ResultRecorder result) + { final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic partition, // Partition key, // Key @@ -48,8 +63,8 @@ public class RestProducer if (e == null) { // HANDLE SUCCESS + result.addSuccess(metadata); produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); log.debug( "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", id, @@ -64,7 +79,7 @@ public class RestProducer else { // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); + result.addFailure(e); log.error( "{} - ERROR key={} timestamp={} latency={}ms: {}", id, @@ -83,8 +98,6 @@ public class RestProducer record.key(), now - time ); - - return result; } @ExceptionHandler diff --git a/src/main/java/de/juplo/kafka/ResultRecorder.java b/src/main/java/de/juplo/kafka/ResultRecorder.java new file mode 100644 index 0000000..d20ee89 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ResultRecorder.java @@ -0,0 +1,74 @@ +package de.juplo.kafka; + +import lombok.Getter; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.springframework.web.context.request.async.DeferredResult; + +import java.util.Arrays; + + +class ResultRecorder +{ + @Getter + private final DeferredResult deferredResult = new DeferredResult(); + private final int numMessages; + private final RecordMetadata[] metadata; + private final Exception[] errors; + + private int sent = 0; + + + ResultRecorder(int numMessages) + { + this.numMessages = numMessages; + this.metadata = new RecordMetadata[numMessages]; + this.errors = new Exception[numMessages]; + } + + + void addSuccess(RecordMetadata m) + { + checkStatus(); + metadata[sent++] = m; + processResult(); + } + + void addFailure(Exception e) + { + checkStatus(); + errors[sent++] = e; + processResult(); + } + + private void checkStatus() throws IllegalStateException + { + if (sent >= numMessages) + throw new IllegalStateException("Already sent " + sent + " messages!"); + } + + private void processResult() + { + if (sent == numMessages) + { + if (Arrays + .stream(errors) + .filter(e -> e != null) + .findAny() + .isPresent()) + { + deferredResult.setErrorResult(new ProduceFailure(errors)); + } + else + { + Integer[] partitions = new Integer[numMessages]; + Long[] offsets = new Long[numMessages]; + for (int i = 0; i < numMessages; i++) + { + partitions[i] = metadata[i].partition(); + offsets[i] = metadata[i].offset(); + } + deferredResult.setResult(new ProduceSuccess(partitions, offsets)); + } + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0d5752c..8ce4821 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -4,7 +4,7 @@ producer: topic: test acks: -1 batch-size: 16384 - linger-ms: 0 + linger-ms: 5 compression-type: gzip management: endpoint: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 646a335..b9c1e17 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -8,6 +8,7 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; @@ -53,11 +54,15 @@ public class ApplicationTests void testSendMessage() throws Exception { mockMvc - .perform(post("/peter").content("Hallo Welt!")) + .perform( + post("/peter") + .header("X-id", 7) + .contentType(MediaType.APPLICATION_JSON) + .content("666")) .andExpect(status().isOk()); await("Message was send") .atMost(Duration.ofSeconds(5)) - .until(() -> consumer.received.size() == 1); + .until(() -> consumer.received.size() == 667); }