From 656d24e02b7e42404a508946885009b59b39e1ee Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 13 Aug 2022 19:40:53 +0200 Subject: [PATCH] =?utf8?q?Initiale=20Version=20eines=20Gateways=20f=C3=BCr?= =?utf8?q?=20die=20SumUp-Services?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Diese Version nimmt lediglich Anfragen per POST enggegen und schreibt sie in das Requests-Topic, das von dem Requests-Services ausgewertet wird. --- README.sh | 46 +++++-------------- docker-compose.yml | 9 ++-- pom.xml | 6 +-- .../juplo/kafka/ApplicationConfiguration.java | 12 ++--- .../de/juplo/kafka/ApplicationProperties.java | 3 +- .../{RestProducer.java => RestGateway.java} | 12 ++--- src/main/resources/application.yml | 17 +++---- .../java/de/juplo/kafka/ApplicationTests.java | 9 ++-- 8 files changed, 45 insertions(+), 69 deletions(-) rename src/main/java/de/juplo/kafka/{RestProducer.java => RestGateway.java} (88%) diff --git a/README.sh b/README.sh index d2dccf8..25b8f62 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/rest-producer:1.0-SNAPSHOT +IMAGE=juplo/sumup-gateway:1.0-SNAPSHOT if [ "$1" = "cleanup" ] then @@ -16,47 +16,25 @@ if [[ "$1" = "build" ]] then - mvn install || exit + docker-compose rm -svf gateway + mvn clean install || exit else echo "Using image existing images:" docker image ls $IMAGE fi echo "Waiting for the Kafka-Cluster to become ready..." -docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1 +docker-compose exec cli cub kafka-ready -b kafka:9092 3 60 > /dev/null 2>&1 || exit 1 docker-compose up setup -docker-compose up -d +docker-compose up -d gateway -while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done +while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done + +docker-compose up -d consumer echo foo | http -v :8080/bar -echo bar | http -v :8080/foo -echo foobar | http -v :8080/bar -dd if=/dev/zero bs=1024 count=1024 | http -v :8080/bar -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo -echo foofoo | http -v :8080/bar -echo barbar | http -v :8080/foo - -docker-compose logs producer +echo 66 | http -v :8080/foo + +docker-compose logs gateway +docker-compose stop consumer docker-compose logs consumer diff --git a/docker-compose.yml b/docker-compose.yml index 7ae8d9b..2c22c7b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -73,15 +73,14 @@ services: image: juplo/toolbox command: sleep infinity - producer: - image: juplo/rest-producer:1.0-SNAPSHOT + gateway: + image: juplo/sumup-gateway:1.0-SNAPSHOT ports: - 8080:8080 environment: server.port: 8080 - producer.bootstrap-server: kafka:9092 - producer.client-id: producer - producer.topic: test + sumup.gateway.bootstrap-server: kafka:9092 + sumup.gateway.client-id: gateway consumer: image: juplo/toolbox diff --git a/pom.xml b/pom.xml index e7ea677..de665ad 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,9 @@ de.juplo.kafka - rest-producer - REST Producer - A Simple Producer that takes messages via POST and confirms successs + sumup-gateway + REST Gateway for the SumUp-Services + A simple REST-Gateway to talk to the Sumup-Services 1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 0642aa4..33dabc9 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,6 +1,7 @@ package de.juplo.kafka; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -14,20 +15,19 @@ import java.util.Properties; public class ApplicationConfiguration { @Bean - public RestProducer restProducer( + public RestGateway restGateway( ApplicationProperties properties, - KafkaProducer kafkaProducer) + KafkaProducer kafkaProducer) { return - new RestProducer( + new RestGateway( properties.getClientId(), properties.getTopic(), - properties.getPartition(), kafkaProducer); } @Bean(destroyMethod = "close") - public KafkaProducer kafkaProducer(ApplicationProperties properties) + public KafkaProducer kafkaProducer(ApplicationProperties properties) { Properties props = new Properties(); props.put("bootstrap.servers", properties.getBootstrapServer()); @@ -39,7 +39,7 @@ public class ApplicationConfiguration props.put("linger.ms", properties.getLingerMs()); props.put("compression.type", properties.getCompressionType()); props.put("key.serializer", StringSerializer.class.getName()); - props.put("value.serializer", StringSerializer.class.getName()); + props.put("value.serializer", IntegerSerializer.class.getName()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java index 673613a..a18b20f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java @@ -8,7 +8,7 @@ import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; -@ConfigurationProperties(prefix = "producer") +@ConfigurationProperties(prefix = "sumup.gateway") @Getter @Setter public class ApplicationProperties @@ -22,7 +22,6 @@ public class ApplicationProperties @NotNull @NotEmpty private String topic; - private Integer partition; @NotNull @NotEmpty private String acks; diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestGateway.java similarity index 88% rename from src/main/java/de/juplo/kafka/RestProducer.java rename to src/main/java/de/juplo/kafka/RestGateway.java index 0467118..4549b8f 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestGateway.java @@ -8,20 +8,16 @@ import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.*; import org.springframework.web.context.request.async.DeferredResult; -import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutionException; - @Slf4j @RequestMapping @ResponseBody @RequiredArgsConstructor -public class RestProducer +public class RestGateway { private final String id; private final String topic; - private final Integer partition; - private final KafkaProducer producer; + private final KafkaProducer producer; private long produced = 0; @@ -29,13 +25,13 @@ public class RestProducer public DeferredResult send( @PathVariable String key, @RequestHeader(name = "X-id", required = false) Long correlationId, - @RequestBody String value) + @RequestBody Integer value) { DeferredResult result = new DeferredResult<>(); final long time = System.currentTimeMillis(); - final ProducerRecord record = new ProducerRecord<>( + final ProducerRecord record = new ProducerRecord<>( topic, // Topic key, // Key value // Value diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 0d5752c..218aaa1 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,11 +1,12 @@ -producer: - bootstrap-server: :9092 - client-id: DEV - topic: test - acks: -1 - batch-size: 16384 - linger-ms: 0 - compression-type: gzip +sumup: + gateway: + bootstrap-server: :9092 + client-id: DEV + topic: test + acks: -1 + batch-size: 16384 + linger-ms: 0 + compression-type: gzip management: endpoint: shutdown: diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 646a335..b7d3e7f 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -8,6 +8,7 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; @@ -26,8 +27,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.bootstrap-server=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) + "sumup.gateway.bootstrap-server=${spring.embedded.kafka.brokers}", + "sumup.gateway.topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j @@ -53,7 +54,9 @@ public class ApplicationTests void testSendMessage() throws Exception { mockMvc - .perform(post("/peter").content("Hallo Welt!")) + .perform(post("/peter") + .content("66") + .contentType(MediaType.APPLICATION_JSON)) .andExpect(status().isOk()); await("Message was send") .atMost(Duration.ofSeconds(5)) -- 2.20.1