#!/bin/bash
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer-json:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
"$1" = "build"
]]
then
+ docker-compose rm -svf producer
mvn clean install || exit
else
echo "Using image existing images:"
while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-# tag::http[]
-echo -n bar | http -v :8080/foo
-echo -n foo | http -v :8080/bar X-id:666
+docker-compose up -d peter klaus
+
+sleep 10
+docker-compose stop peter klaus
+
# end::http[]
# tag::kafkacat[]
docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e
command: sleep infinity
producer:
- image: juplo/rest-producer:1.0-SNAPSHOT
+ image: juplo/rest-producer-json:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
producer.topic: test
peter:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- server.port: 8080
- rest-client.baseUrl: http://producer:8080
- rest-client.username: peter
- rest-client.throttle-ms: 1000
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ while [[ true ]];
+ do
+ echo 777 | http -v producer:8080/peter;
+ sleep 1;
+ done
+ "
klaus:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- server.port: 8080
- rest-client.baseUrl: http://producer:8080
- rest-client.username: klaus
- rest-client.throttle-ms: 1100
-
- beate:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- server.port: 8080
- rest-client.baseUrl: http://producer:8080
- rest-client.username: beate
- rest-client.throttle-ms: 900
-
- franz:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- server.port: 8080
- rest-client.baseUrl: http://producer:8080
- rest-client.username: franz
- rest-client.throttle-ms: 800
-
- uschi:
- image: juplo/rest-client:1.0-SNAPSHOT
- environment:
- server.port: 8080
- rest-client.baseUrl: http://producer:8080
- rest-client.username: uschi
- rest-client.throttle-ms: 1200
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ while [[ true ]];
+ do
+ echo 666 | http -v producer:8080/klaus;
+ sleep 1;
+ done
+ "
consumer:
image: juplo/toolbox
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>rest-producer</artifactId>
+ <artifactId>rest-producer-json</artifactId>
<name>REST Producer</name>
- <description>A Simple Producer that takes messages via POST and confirms successs</description>
+ <description>A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder</description>
<version>1.0-SNAPSHOT</version>
<dependencies>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage
+{
+ private final int number;
+ private final int next;
+}
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
@Bean
public RestProducer restProducer(
ApplicationProperties properties,
- KafkaProducer<String, String> kafkaProducer)
+ KafkaProducer<String, Object> kafkaProducer)
{
return
new RestProducer(
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", JsonSerializer.class.getName());
+ props.put(JsonSerializer.TYPE_MAPPINGS,
+ "ADD:" + AddNumberMessage.class.getName() + "," +
+ "CALC:" + CalculateSumMessage.class.getName());
return new KafkaProducer<>(props);
}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage
+{
+ private final int number;
+}
@Value
public class ProduceFailure implements ProduceResult
{
- private final String error;
- private final String exception;
+ private final String[] error;
+ private final String[] exception;
private final Integer status;
- public ProduceFailure(Exception e)
+ public ProduceFailure(Exception[] e)
{
status = 500;
- exception = e.getClass().getSimpleName();
- error = e.getMessage();
+ exception = new String[e.length];
+ error = new String[e.length];
+ for (int i = 0; i < e.length ; i++)
+ {
+ exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName();
+ error[i] = e[i] == null ? null : e[i].getMessage();
+ }
}
}
@Value
public class ProduceSuccess implements ProduceResult
{
- Integer partition;
- Long offset;
+ Integer[] partition;
+ Long[] offset;
}
private final String id;
private final String topic;
private final Integer partition;
- private final KafkaProducer<String, String> producer;
+ private final KafkaProducer<String, Object> producer;
private long produced = 0;
public DeferredResult<ProduceResult> send(
@PathVariable String key,
@RequestHeader(name = "X-id", required = false) Long correlationId,
- @RequestBody String value)
+ @RequestBody Integer number)
{
- DeferredResult<ProduceResult> result = new DeferredResult<>();
+ ResultRecorder result = new ResultRecorder(number+1);
+ for (int i = 1; i <= number; i++)
+ {
+ send(key, new AddNumberMessage(number, i), correlationId, result);
+ }
+ send(key, new CalculateSumMessage(number), correlationId, result);
+
+ return result.getDeferredResult();
+ }
+
+ private void send(
+ String key,
+ Object value,
+ Long correlationId,
+ ResultRecorder result)
+ {
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<String, Object> record = new ProducerRecord<>(
topic, // Topic
partition, // Partition
key, // Key
if (e == null)
{
// HANDLE SUCCESS
+ result.addSuccess(metadata);
produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
log.debug(
"{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
id,
else
{
// HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
+ result.addFailure(e);
log.error(
"{} - ERROR key={} timestamp={} latency={}ms: {}",
id,
record.key(),
now - time
);
-
- return result;
}
@ExceptionHandler
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+ @Getter
+ private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+ private final int numMessages;
+ private final RecordMetadata[] metadata;
+ private final Exception[] errors;
+
+ private int sent = 0;
+
+
+ ResultRecorder(int numMessages)
+ {
+ this.numMessages = numMessages;
+ this.metadata = new RecordMetadata[numMessages];
+ this.errors = new Exception[numMessages];
+ }
+
+
+ void addSuccess(RecordMetadata m)
+ {
+ checkStatus();
+ metadata[sent++] = m;
+ processResult();
+ }
+
+ void addFailure(Exception e)
+ {
+ checkStatus();
+ errors[sent++] = e;
+ processResult();
+ }
+
+ private void checkStatus() throws IllegalStateException
+ {
+ if (sent >= numMessages)
+ throw new IllegalStateException("Already sent " + sent + " messages!");
+ }
+
+ private void processResult()
+ {
+ if (sent == numMessages)
+ {
+ if (Arrays
+ .stream(errors)
+ .filter(e -> e != null)
+ .findAny()
+ .isPresent())
+ {
+ deferredResult.setErrorResult(new ProduceFailure(errors));
+ }
+ else
+ {
+ Integer[] partitions = new Integer[numMessages];
+ Long[] offsets = new Long[numMessages];
+ for (int i = 0; i < numMessages; i++)
+ {
+ partitions[i] = metadata[i].partition();
+ offsets[i] = metadata[i].offset();
+ }
+ deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+ }
+ }
+ }
+}
topic: test
acks: -1
batch-size: 16384
- linger-ms: 0
+ linger-ms: 5
compression-type: gzip
management:
endpoint:
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
void testSendMessage() throws Exception
{
mockMvc
- .perform(post("/peter").content("Hallo Welt!"))
+ .perform(
+ post("/peter")
+ .header("X-id", 7)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content("666"))
.andExpect(status().isOk());
await("Message was send")
.atMost(Duration.ofSeconds(5))
- .until(() -> consumer.received.size() == 1);
+ .until(() -> consumer.received.size() == 667);
}