if [ "$1" = "cleanup" ]
"$1" = "build"
- mvn install || exit
+ docker-compose rm -svf gateway
+ mvn clean install || exit
echo "Using image existing images:"
docker image ls $IMAGE
echo "Waiting for the Kafka-Cluster to become ready..."
-docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
+docker-compose exec cli cub kafka-ready -b kafka:9092 3 60 > /dev/null 2>&1 || exit 1
docker-compose up setup
-docker-compose up -d
+docker-compose up -d gateway
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for gateway..."; sleep 1; done
+docker-compose up -d consumer
echo foo | http -v :8080/bar
-echo bar | http -v :8080/foo
-echo foobar | http -v :8080/bar
-dd if=/dev/zero bs=1024 count=1024 | http -v :8080/bar
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-echo foofoo | http -v :8080/bar
-echo barbar | http -v :8080/foo
-docker-compose logs producer
+echo 66 | http -v :8080/foo
+docker-compose logs gateway
+docker-compose stop consumer
docker-compose logs consumer
image: juplo/toolbox
command: sleep infinity
- producer:
- image: juplo/rest-producer:1.0-SNAPSHOT
+ gateway:
+ image: juplo/sumup-gateway:1.0-SNAPSHOT
- 8080:8080
server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer
- producer.topic: test
+ sumup.gateway.bootstrap-server: kafka:9092
+ sumup.gateway.client-id: gateway
image: juplo/toolbox
- <artifactId>rest-producer</artifactId>
- <name>REST Producer</name>
- <description>A Simple Producer that takes messages via POST and confirms successs</description>
+ <artifactId>sumup-gateway</artifactId>
+ <name>REST Gateway for the SumUp-Services</name>
+ <description>A simple REST-Gateway to talk to the Sumup-Services</description>
package de.juplo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
public class ApplicationConfiguration
- public RestProducer restProducer(
+ public RestGateway restGateway(
ApplicationProperties properties,
- KafkaProducer<String, String> kafkaProducer)
+ KafkaProducer<String, Integer> kafkaProducer)
- new RestProducer(
+ new RestGateway(
- properties.getPartition(),
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Integer> kafkaProducer(ApplicationProperties properties)
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", IntegerSerializer.class.getName());
return new KafkaProducer<>(props);
import javax.validation.constraints.NotNull;
-@ConfigurationProperties(prefix = "producer")
+@ConfigurationProperties(prefix = "sumup.gateway")
public class ApplicationProperties
private String topic;
- private Integer partition;
private String acks;
--- /dev/null
+package de.juplo.kafka;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+public class RestGateway
+ private final String id;
+ private final String topic;
+ private final KafkaProducer<String, Integer> producer;
+ private long produced = 0;
+ @PostMapping(path = "{key}")
+ public DeferredResult<ProduceResult> send(
+ @PathVariable String key,
+ @RequestHeader(name = "X-id", required = false) Long correlationId,
+ @RequestBody Integer value)
+ {
+ DeferredResult<ProduceResult> result = new DeferredResult<>();
+ final long time = System.currentTimeMillis();
+ final ProducerRecord<String, Integer> record = new ProducerRecord<>(
+ topic, // Topic
+ key, // Key
+ value // Value
+ );
+ producer.send(record, (metadata, e) ->
+ {
+ long now = System.currentTimeMillis();
+ if (e == null)
+ {
+ produced++;
+ result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+ log.debug(
+ "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+ id,
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset(),
+ metadata.timestamp(),
+ now - time
+ );
+ }
+ else
+ {
+ result.setErrorResult(new ProduceFailure(e));
+ log.error(
+ "{} - ERROR key={} timestamp={} latency={}ms: {}",
+ id,
+ record.key(),
+ metadata == null ? -1 : metadata.timestamp(),
+ now - time,
+ e.toString()
+ );
+ }
+ });
+ long now = System.currentTimeMillis();
+ log.trace(
+ "{} - Queued message with key={} latency={}ms",
+ id,
+ record.key(),
+ now - time
+ );
+ return result;
+ }
+ @ExceptionHandler
+ @ResponseStatus(HttpStatus.BAD_REQUEST)
+ public ErrorResponse illegalStateException(IllegalStateException e)
+ {
+ return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+ }
+++ /dev/null
-package de.juplo.kafka;
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.*;
-import org.springframework.web.context.request.async.DeferredResult;
-import javax.annotation.PreDestroy;
-import java.util.concurrent.ExecutionException;
-public class RestProducer
- private final String id;
- private final String topic;
- private final Integer partition;
- private final KafkaProducer<String, String> producer;
- private long produced = 0;
- @PostMapping(path = "{key}")
- public DeferredResult<ProduceResult> send(
- @PathVariable String key,
- @RequestHeader(name = "X-id", required = false) Long correlationId,
- @RequestBody String value)
- {
- DeferredResult<ProduceResult> result = new DeferredResult<>();
- final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
- topic, // Topic
- key, // Key
- value // Value
- );
- producer.send(record, (metadata, e) ->
- {
- long now = System.currentTimeMillis();
- if (e == null)
- {
- produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
- log.debug(
- "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
- id,
- record.key(),
- record.value(),
- metadata.partition(),
- metadata.offset(),
- metadata.timestamp(),
- now - time
- );
- }
- else
- {
- result.setErrorResult(new ProduceFailure(e));
- log.error(
- "{} - ERROR key={} timestamp={} latency={}ms: {}",
- id,
- record.key(),
- metadata == null ? -1 : metadata.timestamp(),
- now - time,
- e.toString()
- );
- }
- });
- long now = System.currentTimeMillis();
- log.trace(
- "{} - Queued message with key={} latency={}ms",
- id,
- record.key(),
- now - time
- );
- return result;
- }
- @ExceptionHandler
- @ResponseStatus(HttpStatus.BAD_REQUEST)
- public ErrorResponse illegalStateException(IllegalStateException e)
- {
- return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
- }
- bootstrap-server: :9092
- client-id: DEV
- topic: test
- acks: -1
- batch-size: 16384
- linger-ms: 0
- compression-type: gzip
+ gateway:
+ bootstrap-server: :9092
+ client-id: DEV
+ topic: test
+ acks: -1
+ batch-size: 16384
+ linger-ms: 0
+ compression-type: gzip
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
properties = {
- "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
- "producer.topic=" + TOPIC})
+ "sumup.gateway.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "sumup.gateway.topic=" + TOPIC})
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
void testSendMessage() throws Exception
- .perform(post("/peter").content("Hallo Welt!"))
+ .perform(post("/peter")
+ .content("66")
+ .contentType(MediaType.APPLICATION_JSON))
await("Message was send")