From: Kai Moritz Date: Sat, 13 Aug 2022 17:40:53 +0000 (+0200) Subject: Initiale Version eines Gateways für die SumUp-Services X-Git-Tag: sumup-gateway---lvm-2-tage~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=656d24e02b7e42404a508946885009b59b39e1ee;hp=5bf794a0d2c7dd49c68161e941e1bd972a6af824;p=demos%2Fkafka%2Ftraining Initiale Version eines Gateways für die SumUp-Services * Diese Version nimmt lediglich Anfragen per POST enggegen und schreibt sie in das Requests-Topic, das von dem Requests-Services ausgewertet wird. --- diff --git a/README.sh b/README.sh index d2dccf8..25b8f62 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/rest-producer:1.0-SNAPSHOT +IMAGE=juplo/sumup-gateway:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -16,47 +16,25 @@ if [[ "$1" = "build" ]] then - mvn install || exit + docker-compose rm -svf gateway + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE fi echo "Waiting for the Kafka-Cluster to become ready..." -docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 +docker-compose exec cli cub kafka-ready -b kafka:9092 3 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d +docker-compose up -d gateway -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done + +docker-compose up -d consumer echo foo | http -v :8080/bar -echo bar | http -v :8080/foo -echo foobar | http -v :8080/bar -dd if=/dev/zero bs=1024 count=1024 | http -v :8080/bar -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo - -docker-compose logs producer +echo 66 | http -v :8080/foo + +docker-compose logs gateway +docker-compose stop consumer docker-compose logs consumer diff --git a/docker-compose.yml b/docker-compose.yml index 7ae8d9b..2c22c7b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,15 +73,14 @@ services: image: juplo/toolbox command: sleep infinity - producer: - image: juplo/rest-producer:1.0-SNAPSHOT + gateway: + image: juplo/sumup-gateway:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer - producer.topic: test + sumup.gateway.bootstrap-server: kafka:9092 + sumup.gateway.client-id: gateway consumer: image: juplo/toolbox diff --git a/pom.xml b/pom.xml index e7ea677..de665ad 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs + sumup-gateway + REST Gateway for the SumUp-Services + A simple REST-Gateway to talk to the Sumup-Services 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0642aa4..33dabc9 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -14,20 +15,19 @@ import java.util.Properties; public class ApplicationConfiguration { @Bean - public RestProducer restProducer( + public RestGateway restGateway( ApplicationProperties properties, - KafkaProducer kafkaProducer) + KafkaProducer kafkaProducer) { return - new RestProducer( + new RestGateway( properties.getClientId(), properties.getTopic(), - properties.getPartition(), kafkaProducer); } @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -39,7 +39,7 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", IntegerSerializer.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 673613a..a18b20f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -8,7 +8,7 @@ import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -@ConfigurationProperties(prefix = "producer") +@ConfigurationProperties(prefix = "sumup.gateway") @Getter @Setter public class ApplicationProperties @@ -22,7 +22,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - private Integer partition; @NotNull @NotEmpty private String acks; diff --git a/src/main/java/de/juplo/kafka/RestGateway.java b/src/main/java/de/juplo/kafka/RestGateway.java new file mode 100644 index 0000000..4549b8f --- /dev/null +++ b/src/main/java/de/juplo/kafka/RestGateway.java @@ -0,0 +1,91 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; + + +@Slf4j +@RequestMapping +@ResponseBody +@RequiredArgsConstructor +public class RestGateway +{ + private final String id; + private final String topic; + private final KafkaProducer producer; + + private long produced = 0; + + @PostMapping(path = "{key}") + public DeferredResult send( + @PathVariable String key, + @RequestHeader(name = "X-id", required = false) Long correlationId, + @RequestBody Integer value) + { + DeferredResult result = new DeferredResult<>(); + + final long time = System.currentTimeMillis(); + + final ProducerRecord record = new ProducerRecord<>( + topic, // Topic + key, // Key + value // Value + ); + + producer.send(record, (metadata, e) -> + { + long now = System.currentTimeMillis(); + if (e == null) + { + // HANDLE SUCCESS + produced++; + result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); + log.debug( + "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", + id, + record.key(), + record.value(), + metadata.partition(), + metadata.offset(), + metadata.timestamp(), + now - time + ); + } + else + { + // HANDLE ERROR + result.setErrorResult(new ProduceFailure(e)); + log.error( + "{} - ERROR key={} timestamp={} latency={}ms: {}", + id, + record.key(), + metadata == null ? -1 : metadata.timestamp(), + now - time, + e.toString() + ); + } + }); + + long now = System.currentTimeMillis(); + log.trace( + "{} - Queued message with key={} latency={}ms", + id, + record.key(), + now - time + ); + + return result; + } + + @ExceptionHandler + @ResponseStatus(HttpStatus.BAD_REQUEST) + public ErrorResponse illegalStateException(IllegalStateException e) + { + return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); + } +} diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java deleted file mode 100644 index 0467118..0000000 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ /dev/null @@ -1,95 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.*; -import org.springframework.web.context.request.async.DeferredResult; - -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutionException; - - -@Slf4j -@RequestMapping -@ResponseBody -@RequiredArgsConstructor -public class RestProducer -{ - private final String id; - private final String topic; - private final Integer partition; - private final KafkaProducer producer; - - private long produced = 0; - - @PostMapping(path = "{key}") - public DeferredResult send( - @PathVariable String key, - @RequestHeader(name = "X-id", required = false) Long correlationId, - @RequestBody String value) - { - DeferredResult result = new DeferredResult<>(); - - final long time = System.currentTimeMillis(); - - final ProducerRecord record = new ProducerRecord<>( - topic, // Topic - key, // Key - value // Value - ); - - producer.send(record, (metadata, e) -> - { - long now = System.currentTimeMillis(); - if (e == null) - { - // HANDLE SUCCESS - produced++; - result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset())); - log.debug( - "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms", - id, - record.key(), - record.value(), - metadata.partition(), - metadata.offset(), - metadata.timestamp(), - now - time - ); - } - else - { - // HANDLE ERROR - result.setErrorResult(new ProduceFailure(e)); - log.error( - "{} - ERROR key={} timestamp={} latency={}ms: {}", - id, - record.key(), - metadata == null ? -1 : metadata.timestamp(), - now - time, - e.toString() - ); - } - }); - - long now = System.currentTimeMillis(); - log.trace( - "{} - Queued message with key={} latency={}ms", - id, - record.key(), - now - time - ); - - return result; - } - - @ExceptionHandler - @ResponseStatus(HttpStatus.BAD_REQUEST) - public ErrorResponse illegalStateException(IllegalStateException e) - { - return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value()); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0d5752c..218aaa1 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,12 @@ -producer: - bootstrap-server: :9092 - client-id: DEV - topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip +sumup: + gateway: + bootstrap-server: :9092 + client-id: DEV + topic: test + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 646a335..b7d3e7f 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -8,6 +8,7 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; @@ -26,8 +27,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.bootstrap-server=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) + "sumup.gateway.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.gateway.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j @@ -53,7 +54,9 @@ public class ApplicationTests void testSendMessage() throws Exception { mockMvc - .perform(post("/peter").content("Hallo Welt!")) + .perform(post("/peter") + .content("66") + .contentType(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()); await("Message was send") .atMost(Duration.ofSeconds(5))