#!/bin/bash
 
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer--json:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
   "$1" = "build"
 ]]
 then
+  docker-compose rm -svf producer
   mvn clean install || exit
 else
   echo "Using image existing images:"
 docker-compose -f docker/docker-compose.yml up -d producer
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
-# tag::hashed[]
-echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
-# end::hashed[]
-echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
-# tag::hashed[]
-echo -n Nachricht 1 an peter über producer | http -v :8080/peter
-# end::hashed[]
-echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 2 an peter über producer | http -v :8080/peter
-echo -n Nachricht 3 an peter über producer | http -v :8080/peter
+docker-compose -f docker/docker-compose.yml up -d peter klaus
 
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
+sleep 10
+docker-compose -f docker/docker-compose.yml stop peter klaus
 
-docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# tag::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-# end::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose -f docker/docker-compose.yml restart producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-
-echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 5 an peter über producer | http -v :8080/peter
-echo -n Nachricht 4 an peter über producer | http -v :8080/peter
-echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an peter über producer | http -v :8080/peter
-
-echo Nachrichten in Partition 0:
 # tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n'
 # end::kafkacat[]
-echo
-echo Nachrichten in Partition 1:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-
-
-docker-compose -f docker/docker-compose.yml restart setup
-sleep 1
-docker-compose -f docker/docker-compose.yml up -d producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-# tag::fixed[]
-echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 2 über producer-0 | http -v :8000/peter
-echo -n Nachricht 2 über producer-1 | http -v :8001/peter
-# end::fixed[]
-
-docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose -f docker/docker-compose.yml restart producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 4 über producer-0 | http -v :8000/peter
-echo -n Nachricht 4 über producer-1 | http -v :8001/peter
-
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
 
       - setup
 
   producer:
-    image: juplo/rest-producer:1.0-SNAPSHOT
+    image: juplo/rest-producer--json:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
       producer.client-id: producer
       producer.topic: test
 
-  producer-0:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8000:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer-0
-      producer.topic: test
-      producer.partition: 0
-
-  producer-1:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8001:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer-1
-      producer.topic: test
-      producer.partition: 1
-
-  consumer-1:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-1
+  peter:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 777 | http -v producer:8080/peter;
+        sleep 1;
+      done
+      "
 
-  consumer-2:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-2
+  klaus:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 666 | http -v producer:8080/klaus;
+        sleep 1;
+      done
+      "
 
-  consumer-3:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-3
+  consumer:
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
 
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>rest-producer</artifactId>
+  <artifactId>rest-producer--json</artifactId>
   <name>REST Producer</name>
-  <description>A Simple Producer that takes messages via POST and confirms successs</description>
+  <description>A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder</description>
   <version>1.0-SNAPSHOT</version>
 
   <properties>
       <artifactId>spring-boot-starter-validation</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
 
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage
+{
+  private final int number;
+  private final int next;
+}
 
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 
   @Bean
   public RestProducer restProducer(
       ApplicationProperties properties,
-      KafkaProducer<String, String> kafkaProducer)
+      KafkaProducer<String, Object> kafkaProducer)
   {
     return
         new RestProducer(
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
     props.put("linger.ms", properties.getLingerMs());
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", JsonSerializer.class.getName());
+    props.put(JsonSerializer.TYPE_MAPPINGS,
+        "ADD:" + AddNumberMessage.class.getName() + "," +
+        "CALC:" + CalculateSumMessage.class.getName());
 
     return new KafkaProducer<>(props);
   }
 
--- /dev/null
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage
+{
+  private final int number;
+}
 
 @Value
 public class ProduceFailure implements ProduceResult
 {
-  private final String error;
-  private final String exception;
+  private final String[] error;
+  private final String[] exception;
   private final Integer status;
 
 
-  public ProduceFailure(Exception e)
+  public ProduceFailure(Exception[] e)
   {
     status = 500;
-    exception = e.getClass().getSimpleName();
-    error = e.getMessage();
+    exception = new String[e.length];
+    error = new String[e.length];
+    for (int i = 0; i < e.length ; i++)
+    {
+      exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName();
+      error[i] = e[i] == null ? null : e[i].getMessage();
+    }
   }
 }
 
 @Value
 public class ProduceSuccess implements ProduceResult
 {
-  Integer partition;
-  Long offset;
+  Integer[] partition;
+  Long[] offset;
 }
 
   private final String id;
   private final String topic;
   private final Integer partition;
-  private final Producer<String, String> producer;
+  private final Producer<String, Object> producer;
 
   private long produced = 0;
 
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
       @RequestHeader(name = "X-id", required = false) Long correlationId,
-      @RequestBody String value)
+      @RequestBody Integer number)
   {
-    DeferredResult<ProduceResult> result = new DeferredResult<>();
+    ResultRecorder result = new ResultRecorder(number+1);
 
+    for (int i = 1; i <= number; i++)
+    {
+      send(key, new AddNumberMessage(number, i), correlationId, result);
+    }
+    send(key, new CalculateSumMessage(number), correlationId, result);
+
+    return result.getDeferredResult();
+  }
+
+  private void send(
+      String key,
+      Object value,
+      Long correlationId,
+      ResultRecorder result)
+  {
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
+    final ProducerRecord<String, Object> record = new ProducerRecord<>(
         topic,  // Topic
         partition, // Partition
         key,    // Key
       if (e == null)
       {
         // HANDLE SUCCESS
+        result.addSuccess(metadata);
         produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
       else
       {
         // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
+        result.addFailure(e);
         log.error(
             "{} - ERROR key={} timestamp={} latency={}ms: {}",
             id,
         record.key(),
         now - time
     );
-
-    return result;
   }
 
   @ExceptionHandler
 
--- /dev/null
+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+  @Getter
+  private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+  private final int numMessages;
+  private final RecordMetadata[] metadata;
+  private final Exception[] errors;
+
+  private int sent = 0;
+
+
+  ResultRecorder(int numMessages)
+  {
+    this.numMessages = numMessages;
+    this.metadata = new RecordMetadata[numMessages];
+    this.errors = new Exception[numMessages];
+  }
+
+
+  void addSuccess(RecordMetadata m)
+  {
+    checkStatus();
+    metadata[sent++] = m;
+    processResult();
+  }
+
+  void addFailure(Exception e)
+  {
+    checkStatus();
+    errors[sent++] = e;
+    processResult();
+  }
+
+  private void checkStatus() throws IllegalStateException
+  {
+    if (sent >= numMessages)
+      throw new IllegalStateException("Already sent " + sent + " messages!");
+  }
+
+  private void processResult()
+  {
+    if (sent == numMessages)
+    {
+      if (Arrays
+          .stream(errors)
+          .filter(e -> e != null)
+          .findAny()
+          .isPresent())
+      {
+        deferredResult.setErrorResult(new ProduceFailure(errors));
+      }
+      else
+      {
+        Integer[] partitions = new Integer[numMessages];
+        Long[] offsets = new Long[numMessages];
+        for (int i = 0; i < numMessages; i++)
+        {
+          partitions[i] = metadata[i].partition();
+          offsets[i] = metadata[i].offset();
+        }
+        deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+      }
+    }
+  }
+}
 
   topic: test
   acks: -1
   batch-size: 16384
-  linger-ms: 0
+  linger-ms: 5
   compression-type: gzip
 management:
   endpoint:
 
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.web.servlet.MockMvc;
        void testSendMessage() throws Exception
        {
                mockMvc
-                               .perform(post("/peter").content("Hallo Welt!"))
+                               .perform(
+                                               post("/peter")
+                                                               .header("X-id", 7)
+                                                               .contentType(MediaType.APPLICATION_JSON)
+                                                               .content("666"))
                                .andExpect(status().isOk());
                await("Message was send")
                                .atMost(Duration.ofSeconds(5))
-                               .until(() -> consumer.received.size() == 1);
+                               .until(() -> consumer.received.size() == 667);
        }