#!/bin/bash
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer--json:1.0-SNAPSHOT
if [ "$1" = "cleanup" ]
then
"$1" = "build"
]]
then
+ docker-compose rm -svf producer
mvn clean install || exit
else
echo "Using image existing images:"
docker-compose -f docker/docker-compose.yml up -d producer
while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-# tag::hashed[]
-echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
-# end::hashed[]
-echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
-# tag::hashed[]
-echo -n Nachricht 1 an peter über producer | http -v :8080/peter
-# end::hashed[]
-echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 2 an peter über producer | http -v :8080/peter
-echo -n Nachricht 3 an peter über producer | http -v :8080/peter
+docker-compose -f docker/docker-compose.yml up -d peter klaus
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
+sleep 10
+docker-compose -f docker/docker-compose.yml stop peter klaus
-docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# tag::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-# end::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose -f docker/docker-compose.yml restart producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-
-echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 5 an peter über producer | http -v :8080/peter
-echo -n Nachricht 4 an peter über producer | http -v :8080/peter
-echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an peter über producer | http -v :8080/peter
-
-echo Nachrichten in Partition 0:
# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n'
# end::kafkacat[]
-echo
-echo Nachrichten in Partition 1:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-
-
-docker-compose -f docker/docker-compose.yml restart setup
-sleep 1
-docker-compose -f docker/docker-compose.yml up -d producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-# tag::fixed[]
-echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 2 über producer-0 | http -v :8000/peter
-echo -n Nachricht 2 über producer-1 | http -v :8001/peter
-# end::fixed[]
-
-docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose -f docker/docker-compose.yml restart producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 4 über producer-0 | http -v :8000/peter
-echo -n Nachricht 4 über producer-1 | http -v :8001/peter
-
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
- setup
producer:
- image: juplo/rest-producer:1.0-SNAPSHOT
+ image: juplo/rest-producer--json:1.0-SNAPSHOT
ports:
- 8080:8080
environment:
producer.client-id: producer
producer.topic: test
- producer-0:
- image: juplo/rest-producer:1.0-SNAPSHOT
- ports:
- - 8000:8080
- environment:
- server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer-0
- producer.topic: test
- producer.partition: 0
-
- producer-1:
- image: juplo/rest-producer:1.0-SNAPSHOT
- ports:
- - 8001:8080
- environment:
- server.port: 8080
- producer.bootstrap-server: kafka:9092
- producer.client-id: producer-1
- producer.topic: test
- producer.partition: 1
-
- consumer-1:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-1
+ peter:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ while [[ true ]];
+ do
+ echo 777 | http -v producer:8080/peter;
+ sleep 1;
+ done
+ "
- consumer-2:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-2
+ klaus:
+ image: juplo/toolbox
+ command: >
+ bash -c "
+ while [[ true ]];
+ do
+ echo 666 | http -v producer:8080/klaus;
+ sleep 1;
+ done
+ "
- consumer-3:
- image: juplo/simple-consumer:1.0-SNAPSHOT
- command: kafka:9092 test my-group consumer-3
+ consumer:
+ image: juplo/toolbox
+ command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
</parent>
<groupId>de.juplo.kafka</groupId>
- <artifactId>rest-producer</artifactId>
+ <artifactId>rest-producer--json</artifactId>
<name>REST Producer</name>
- <description>A Simple Producer that takes messages via POST and confirms successs</description>
+ <description>A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder</description>
<version>1.0-SNAPSHOT</version>
<properties>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage
+{
+ private final int number;
+ private final int next;
+}
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
@Bean
public RestProducer restProducer(
ApplicationProperties properties,
- KafkaProducer<String, String> kafkaProducer)
+ KafkaProducer<String, Object> kafkaProducer)
{
return
new RestProducer(
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", JsonSerializer.class.getName());
+ props.put(JsonSerializer.TYPE_MAPPINGS,
+ "ADD:" + AddNumberMessage.class.getName() + "," +
+ "CALC:" + CalculateSumMessage.class.getName());
return new KafkaProducer<>(props);
}
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage
+{
+ private final int number;
+}
@Value
public class ProduceFailure implements ProduceResult
{
- private final String error;
- private final String exception;
+ private final String[] error;
+ private final String[] exception;
private final Integer status;
- public ProduceFailure(Exception e)
+ public ProduceFailure(Exception[] e)
{
status = 500;
- exception = e.getClass().getSimpleName();
- error = e.getMessage();
+ exception = new String[e.length];
+ error = new String[e.length];
+ for (int i = 0; i < e.length ; i++)
+ {
+ exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName();
+ error[i] = e[i] == null ? null : e[i].getMessage();
+ }
}
}
@Value
public class ProduceSuccess implements ProduceResult
{
- Integer partition;
- Long offset;
+ Integer[] partition;
+ Long[] offset;
}
private final String id;
private final String topic;
private final Integer partition;
- private final Producer<String, String> producer;
+ private final Producer<String, Object> producer;
private long produced = 0;
public DeferredResult<ProduceResult> send(
@PathVariable String key,
@RequestHeader(name = "X-id", required = false) Long correlationId,
- @RequestBody String value)
+ @RequestBody Integer number)
{
- DeferredResult<ProduceResult> result = new DeferredResult<>();
+ ResultRecorder result = new ResultRecorder(number+1);
+ for (int i = 1; i <= number; i++)
+ {
+ send(key, new AddNumberMessage(number, i), correlationId, result);
+ }
+ send(key, new CalculateSumMessage(number), correlationId, result);
+
+ return result.getDeferredResult();
+ }
+
+ private void send(
+ String key,
+ Object value,
+ Long correlationId,
+ ResultRecorder result)
+ {
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<String, Object> record = new ProducerRecord<>(
topic, // Topic
partition, // Partition
key, // Key
if (e == null)
{
// HANDLE SUCCESS
+ result.addSuccess(metadata);
produced++;
- result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
log.debug(
"{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
id,
else
{
// HANDLE ERROR
- result.setErrorResult(new ProduceFailure(e));
+ result.addFailure(e);
log.error(
"{} - ERROR key={} timestamp={} latency={}ms: {}",
id,
record.key(),
now - time
);
-
- return result;
}
@ExceptionHandler
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+ @Getter
+ private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+ private final int numMessages;
+ private final RecordMetadata[] metadata;
+ private final Exception[] errors;
+
+ private int sent = 0;
+
+
+ ResultRecorder(int numMessages)
+ {
+ this.numMessages = numMessages;
+ this.metadata = new RecordMetadata[numMessages];
+ this.errors = new Exception[numMessages];
+ }
+
+
+ void addSuccess(RecordMetadata m)
+ {
+ checkStatus();
+ metadata[sent++] = m;
+ processResult();
+ }
+
+ void addFailure(Exception e)
+ {
+ checkStatus();
+ errors[sent++] = e;
+ processResult();
+ }
+
+ private void checkStatus() throws IllegalStateException
+ {
+ if (sent >= numMessages)
+ throw new IllegalStateException("Already sent " + sent + " messages!");
+ }
+
+ private void processResult()
+ {
+ if (sent == numMessages)
+ {
+ if (Arrays
+ .stream(errors)
+ .filter(e -> e != null)
+ .findAny()
+ .isPresent())
+ {
+ deferredResult.setErrorResult(new ProduceFailure(errors));
+ }
+ else
+ {
+ Integer[] partitions = new Integer[numMessages];
+ Long[] offsets = new Long[numMessages];
+ for (int i = 0; i < numMessages; i++)
+ {
+ partitions[i] = metadata[i].partition();
+ offsets[i] = metadata[i].offset();
+ }
+ deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+ }
+ }
+ }
+}
topic: test
acks: -1
batch-size: 16384
- linger-ms: 0
+ linger-ms: 5
compression-type: gzip
management:
endpoint:
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
void testSendMessage() throws Exception
{
mockMvc
- .perform(post("/peter").content("Hallo Welt!"))
+ .perform(
+ post("/peter")
+ .header("X-id", 7)
+ .contentType(MediaType.APPLICATION_JSON)
+ .content("666"))
.andExpect(status().isOk());
await("Message was send")
.atMost(Duration.ofSeconds(5))
- .until(() -> consumer.received.size() == 1);
+ .until(() -> consumer.received.size() == 667);
}