Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet
authorKai Moritz <kai@juplo.de>
Sun, 6 Nov 2022 19:00:21 +0000 (20:00 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 17 Jun 2023 16:40:22 +0000 (18:40 +0200)
12 files changed:
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/AddNumberMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/CalculateSumMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ProduceFailure.java
src/main/java/de/juplo/kafka/ProduceSuccess.java
src/main/java/de/juplo/kafka/RestProducer.java
src/main/java/de/juplo/kafka/ResultRecorder.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index f227f47..af458b9 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer--json:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -17,6 +17,7 @@ if [[
   "$1" = "build"
 ]]
 then
+  docker-compose rm -svf producer
   mvn clean install || exit
 else
   echo "Using image existing images:"
@@ -33,91 +34,11 @@ docker-compose -f docker/docker-compose.yml logs setup
 docker-compose -f docker/docker-compose.yml up -d producer
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
-# tag::hashed[]
-echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
-# end::hashed[]
-echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
-# tag::hashed[]
-echo -n Nachricht 1 an peter über producer | http -v :8080/peter
-# end::hashed[]
-echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 2 an peter über producer | http -v :8080/peter
-echo -n Nachricht 3 an peter über producer | http -v :8080/peter
+docker-compose -f docker/docker-compose.yml up -d peter klaus
 
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
+sleep 10
+docker-compose -f docker/docker-compose.yml stop peter klaus
 
-docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# tag::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-# end::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose -f docker/docker-compose.yml restart producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-
-echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 5 an peter über producer | http -v :8080/peter
-echo -n Nachricht 4 an peter über producer | http -v :8080/peter
-echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an peter über producer | http -v :8080/peter
-
-echo Nachrichten in Partition 0:
 # tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+kafkacat -b :9092 -t test -o 0 -e -f 'p=%p|o=%o|k=%k|h=%h|v=%s\n'
 # end::kafkacat[]
-echo
-echo Nachrichten in Partition 1:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-
-
-docker-compose -f docker/docker-compose.yml restart setup
-sleep 1
-docker-compose -f docker/docker-compose.yml up -d producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-# tag::fixed[]
-echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 2 über producer-0 | http -v :8000/peter
-echo -n Nachricht 2 über producer-1 | http -v :8001/peter
-# end::fixed[]
-
-docker-compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose -f docker/docker-compose.yml restart producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 4 über producer-0 | http -v :8000/peter
-echo -n Nachricht 4 über producer-1 | http -v :8001/peter
-
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
index 521eec1..e2b3c43 100644 (file)
@@ -100,7 +100,7 @@ services:
       - setup
 
   producer:
-    image: juplo/rest-producer:1.0-SNAPSHOT
+    image: juplo/rest-producer--json:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
@@ -109,36 +109,28 @@ services:
       producer.client-id: producer
       producer.topic: test
 
-  producer-0:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8000:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer-0
-      producer.topic: test
-      producer.partition: 0
-
-  producer-1:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8001:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer-1
-      producer.topic: test
-      producer.partition: 1
-
-  consumer-1:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-1
+  peter:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 777 | http -v producer:8080/peter;
+        sleep 1;
+      done
+      "
 
-  consumer-2:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-2
+  klaus:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 666 | http -v producer:8080/klaus;
+        sleep 1;
+      done
+      "
 
-  consumer-3:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-3
+  consumer:
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
diff --git a/pom.xml b/pom.xml
index 7d977a6..a7734ec 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>rest-producer</artifactId>
+  <artifactId>rest-producer--json</artifactId>
   <name>REST Producer</name>
-  <description>A Simple Producer that takes messages via POST and confirms successs</description>
+  <description>A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder</description>
   <version>1.0-SNAPSHOT</version>
 
   <properties>
@@ -40,8 +40,8 @@
       <artifactId>spring-boot-starter-validation</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java
new file mode 100644 (file)
index 0000000..88b5d6f
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage
+{
+  private final int number;
+  private final int next;
+}
index 0642aa4..9a11f6e 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 
@@ -16,7 +17,7 @@ public class ApplicationConfiguration
   @Bean
   public RestProducer restProducer(
       ApplicationProperties properties,
-      KafkaProducer<String, String> kafkaProducer)
+      KafkaProducer<String, Object> kafkaProducer)
   {
     return
         new RestProducer(
@@ -27,7 +28,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -39,7 +40,10 @@ public class ApplicationConfiguration
     props.put("linger.ms", properties.getLingerMs());
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", JsonSerializer.class.getName());
+    props.put(JsonSerializer.TYPE_MAPPINGS,
+        "ADD:" + AddNumberMessage.class.getName() + "," +
+        "CALC:" + CalculateSumMessage.class.getName());
 
     return new KafkaProducer<>(props);
   }
diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java
new file mode 100644 (file)
index 0000000..5d8c414
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage
+{
+  private final int number;
+}
index 873a67b..b7785f1 100644 (file)
@@ -7,15 +7,20 @@ import lombok.Value;
 @Value
 public class ProduceFailure implements ProduceResult
 {
-  private final String error;
-  private final String exception;
+  private final String[] error;
+  private final String[] exception;
   private final Integer status;
 
 
-  public ProduceFailure(Exception e)
+  public ProduceFailure(Exception[] e)
   {
     status = 500;
-    exception = e.getClass().getSimpleName();
-    error = e.getMessage();
+    exception = new String[e.length];
+    error = new String[e.length];
+    for (int i = 0; i < e.length ; i++)
+    {
+      exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName();
+      error[i] = e[i] == null ? null : e[i].getMessage();
+    }
   }
 }
index 9c79e8b..8afe795 100644 (file)
@@ -7,6 +7,6 @@ import lombok.Value;
 @Value
 public class ProduceSuccess implements ProduceResult
 {
-  Integer partition;
-  Long offset;
+  Integer[] partition;
+  Long[] offset;
 }
index 73bec5b..0158774 100644 (file)
@@ -21,7 +21,7 @@ public class RestProducer
   private final String id;
   private final String topic;
   private final Integer partition;
-  private final Producer<String, String> producer;
+  private final Producer<String, Object> producer;
 
   private long produced = 0;
 
@@ -29,13 +29,28 @@ public class RestProducer
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
       @RequestHeader(name = "X-id", required = false) Long correlationId,
-      @RequestBody String value)
+      @RequestBody Integer number)
   {
-    DeferredResult<ProduceResult> result = new DeferredResult<>();
+    ResultRecorder result = new ResultRecorder(number+1);
 
+    for (int i = 1; i <= number; i++)
+    {
+      send(key, new AddNumberMessage(number, i), correlationId, result);
+    }
+    send(key, new CalculateSumMessage(number), correlationId, result);
+
+    return result.getDeferredResult();
+  }
+
+  private void send(
+      String key,
+      Object value,
+      Long correlationId,
+      ResultRecorder result)
+  {
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
+    final ProducerRecord<String, Object> record = new ProducerRecord<>(
         topic,  // Topic
         partition, // Partition
         key,    // Key
@@ -48,8 +63,8 @@ public class RestProducer
       if (e == null)
       {
         // HANDLE SUCCESS
+        result.addSuccess(metadata);
         produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
@@ -64,7 +79,7 @@ public class RestProducer
       else
       {
         // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
+        result.addFailure(e);
         log.error(
             "{} - ERROR key={} timestamp={} latency={}ms: {}",
             id,
@@ -83,8 +98,6 @@ public class RestProducer
         record.key(),
         now - time
     );
-
-    return result;
   }
 
   @ExceptionHandler
diff --git a/src/main/java/de/juplo/kafka/ResultRecorder.java b/src/main/java/de/juplo/kafka/ResultRecorder.java
new file mode 100644 (file)
index 0000000..d20ee89
--- /dev/null
@@ -0,0 +1,74 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+  @Getter
+  private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+  private final int numMessages;
+  private final RecordMetadata[] metadata;
+  private final Exception[] errors;
+
+  private int sent = 0;
+
+
+  ResultRecorder(int numMessages)
+  {
+    this.numMessages = numMessages;
+    this.metadata = new RecordMetadata[numMessages];
+    this.errors = new Exception[numMessages];
+  }
+
+
+  void addSuccess(RecordMetadata m)
+  {
+    checkStatus();
+    metadata[sent++] = m;
+    processResult();
+  }
+
+  void addFailure(Exception e)
+  {
+    checkStatus();
+    errors[sent++] = e;
+    processResult();
+  }
+
+  private void checkStatus() throws IllegalStateException
+  {
+    if (sent >= numMessages)
+      throw new IllegalStateException("Already sent " + sent + " messages!");
+  }
+
+  private void processResult()
+  {
+    if (sent == numMessages)
+    {
+      if (Arrays
+          .stream(errors)
+          .filter(e -> e != null)
+          .findAny()
+          .isPresent())
+      {
+        deferredResult.setErrorResult(new ProduceFailure(errors));
+      }
+      else
+      {
+        Integer[] partitions = new Integer[numMessages];
+        Long[] offsets = new Long[numMessages];
+        for (int i = 0; i < numMessages; i++)
+        {
+          partitions[i] = metadata[i].partition();
+          offsets[i] = metadata[i].offset();
+        }
+        deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+      }
+    }
+  }
+}
index 0d5752c..8ce4821 100644 (file)
@@ -4,7 +4,7 @@ producer:
   topic: test
   acks: -1
   batch-size: 16384
-  linger-ms: 0
+  linger-ms: 5
   compression-type: gzip
 management:
   endpoint:
index 646a335..b9c1e17 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.web.servlet.MockMvc;
@@ -53,11 +54,15 @@ public class ApplicationTests
        void testSendMessage() throws Exception
        {
                mockMvc
-                               .perform(post("/peter").content("Hallo Welt!"))
+                               .perform(
+                                               post("/peter")
+                                                               .header("X-id", 7)
+                                                               .contentType(MediaType.APPLICATION_JSON)
+                                                               .content("666"))
                                .andExpect(status().isOk());
                await("Message was send")
                                .atMost(Duration.ofSeconds(5))
-                               .until(() -> consumer.received.size() == 1);
+                               .until(() -> consumer.received.size() == 667);
        }