Version des Rest-Producers, der direkt Requests für den Sumup-Adder sendet
authorKai Moritz <kai@juplo.de>
Sun, 6 Nov 2022 19:00:21 +0000 (20:00 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 13 Nov 2022 17:33:42 +0000 (18:33 +0100)
12 files changed:
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/AddNumberMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/CalculateSumMessage.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ProduceFailure.java
src/main/java/de/juplo/kafka/ProduceSuccess.java
src/main/java/de/juplo/kafka/RestProducer.java
src/main/java/de/juplo/kafka/ResultRecorder.java [new file with mode: 0644]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 2649b38..52b5f20 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer-json:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -16,6 +16,7 @@ if [[
   "$1" = "build"
 ]]
 then
+  docker-compose rm -svf producer
   mvn clean install || exit
 else
   echo "Using image existing images:"
@@ -29,9 +30,11 @@ docker-compose up -d producer
 
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
-# tag::http[]
-echo -n bar | http -v :8080/foo
-echo -n foo | http -v :8080/bar X-id:666
+docker-compose up -d peter klaus
+
+sleep 10
+docker-compose stop peter klaus
+
 # end::http[]
 # tag::kafkacat[]
 docker-compose exec cli kafkacat -b kafka:9092 -t test -f "%p|%o|%k=%s|%h\n" -e
index 47775e3..175eed5 100644 (file)
@@ -92,7 +92,7 @@ services:
     command: sleep infinity
 
   producer:
-    image: juplo/rest-producer:1.0-SNAPSHOT
+    image: juplo/rest-producer-json:1.0-SNAPSHOT
     ports:
       - 8080:8080
     environment:
@@ -102,44 +102,26 @@ services:
       producer.topic: test
 
   peter:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      server.port: 8080
-      rest-client.baseUrl: http://producer:8080
-      rest-client.username: peter
-      rest-client.throttle-ms: 1000
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 777 | http -v producer:8080/peter;
+        sleep 1;
+      done
+      "
 
   klaus:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      server.port: 8080
-      rest-client.baseUrl: http://producer:8080
-      rest-client.username: klaus
-      rest-client.throttle-ms: 1100
-
-  beate:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      server.port: 8080
-      rest-client.baseUrl: http://producer:8080
-      rest-client.username: beate
-      rest-client.throttle-ms: 900
-
-  franz:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      server.port: 8080
-      rest-client.baseUrl: http://producer:8080
-      rest-client.username: franz
-      rest-client.throttle-ms: 800
-
-  uschi:
-    image: juplo/rest-client:1.0-SNAPSHOT
-    environment:
-      server.port: 8080
-      rest-client.baseUrl: http://producer:8080
-      rest-client.username: uschi
-      rest-client.throttle-ms: 1200
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo 666 | http -v producer:8080/klaus;
+        sleep 1;
+      done
+      "
 
   consumer:
     image: juplo/toolbox
diff --git a/pom.xml b/pom.xml
index e7ea677..7772b6c 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>rest-producer</artifactId>
+  <artifactId>rest-producer-json</artifactId>
   <name>REST Producer</name>
-  <description>A Simple Producer that takes messages via POST and confirms successs</description>
+  <description>A Producer that takes messages via POST and sends JSON-requests to the Sumup-Adder</description>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
@@ -36,8 +36,8 @@
       <artifactId>spring-boot-starter-validation</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka-clients</artifactId>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
     </dependency>
     <dependency>
       <groupId>org.projectlombok</groupId>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.springframework.kafka</groupId>
-      <artifactId>spring-kafka</artifactId>
-      <scope>test</scope>
-    </dependency>
     <dependency>
       <groupId>org.springframework.kafka</groupId>
       <artifactId>spring-kafka-test</artifactId>
diff --git a/src/main/java/de/juplo/kafka/AddNumberMessage.java b/src/main/java/de/juplo/kafka/AddNumberMessage.java
new file mode 100644 (file)
index 0000000..88b5d6f
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class AddNumberMessage
+{
+  private final int number;
+  private final int next;
+}
index 0642aa4..9a11f6e 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Properties;
 
@@ -16,7 +17,7 @@ public class ApplicationConfiguration
   @Bean
   public RestProducer restProducer(
       ApplicationProperties properties,
-      KafkaProducer<String, String> kafkaProducer)
+      KafkaProducer<String, Object> kafkaProducer)
   {
     return
         new RestProducer(
@@ -27,7 +28,7 @@ public class ApplicationConfiguration
   }
 
   @Bean(destroyMethod = "close")
-  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
   {
     Properties props = new Properties();
     props.put("bootstrap.servers", properties.getBootstrapServer());
@@ -39,7 +40,10 @@ public class ApplicationConfiguration
     props.put("linger.ms", properties.getLingerMs());
     props.put("compression.type", properties.getCompressionType());
     props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", JsonSerializer.class.getName());
+    props.put(JsonSerializer.TYPE_MAPPINGS,
+        "ADD:" + AddNumberMessage.class.getName() + "," +
+        "CALC:" + CalculateSumMessage.class.getName());
 
     return new KafkaProducer<>(props);
   }
diff --git a/src/main/java/de/juplo/kafka/CalculateSumMessage.java b/src/main/java/de/juplo/kafka/CalculateSumMessage.java
new file mode 100644 (file)
index 0000000..5d8c414
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class CalculateSumMessage
+{
+  private final int number;
+}
index 873a67b..b7785f1 100644 (file)
@@ -7,15 +7,20 @@ import lombok.Value;
 @Value
 public class ProduceFailure implements ProduceResult
 {
-  private final String error;
-  private final String exception;
+  private final String[] error;
+  private final String[] exception;
   private final Integer status;
 
 
-  public ProduceFailure(Exception e)
+  public ProduceFailure(Exception[] e)
   {
     status = 500;
-    exception = e.getClass().getSimpleName();
-    error = e.getMessage();
+    exception = new String[e.length];
+    error = new String[e.length];
+    for (int i = 0; i < e.length ; i++)
+    {
+      exception[i] = e[i] == null ? null : e[i].getClass().getSimpleName();
+      error[i] = e[i] == null ? null : e[i].getMessage();
+    }
   }
 }
index 9c79e8b..8afe795 100644 (file)
@@ -7,6 +7,6 @@ import lombok.Value;
 @Value
 public class ProduceSuccess implements ProduceResult
 {
-  Integer partition;
-  Long offset;
+  Integer[] partition;
+  Long[] offset;
 }
index debe366..4be2dcd 100644 (file)
@@ -20,7 +20,7 @@ public class RestProducer
   private final String id;
   private final String topic;
   private final Integer partition;
-  private final KafkaProducer<String, String> producer;
+  private final KafkaProducer<String, Object> producer;
 
   private long produced = 0;
 
@@ -28,13 +28,28 @@ public class RestProducer
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
       @RequestHeader(name = "X-id", required = false) Long correlationId,
-      @RequestBody String value)
+      @RequestBody Integer number)
   {
-    DeferredResult<ProduceResult> result = new DeferredResult<>();
+    ResultRecorder result = new ResultRecorder(number+1);
 
+    for (int i = 1; i <= number; i++)
+    {
+      send(key, new AddNumberMessage(number, i), correlationId, result);
+    }
+    send(key, new CalculateSumMessage(number), correlationId, result);
+
+    return result.getDeferredResult();
+  }
+
+  private void send(
+      String key,
+      Object value,
+      Long correlationId,
+      ResultRecorder result)
+  {
     final long time = System.currentTimeMillis();
 
-    final ProducerRecord<String, String> record = new ProducerRecord<>(
+    final ProducerRecord<String, Object> record = new ProducerRecord<>(
         topic,  // Topic
         partition, // Partition
         key,    // Key
@@ -53,8 +68,8 @@ public class RestProducer
       if (e == null)
       {
         // HANDLE SUCCESS
+        result.addSuccess(metadata);
         produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
@@ -69,7 +84,7 @@ public class RestProducer
       else
       {
         // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
+        result.addFailure(e);
         log.error(
             "{} - ERROR key={} timestamp={} latency={}ms: {}",
             id,
@@ -88,8 +103,6 @@ public class RestProducer
         record.key(),
         now - time
     );
-
-    return result;
   }
 
   @ExceptionHandler
diff --git a/src/main/java/de/juplo/kafka/ResultRecorder.java b/src/main/java/de/juplo/kafka/ResultRecorder.java
new file mode 100644 (file)
index 0000000..d20ee89
--- /dev/null
@@ -0,0 +1,74 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.util.Arrays;
+
+
+class ResultRecorder
+{
+  @Getter
+  private final DeferredResult<ProduceResult> deferredResult = new DeferredResult<ProduceResult>();
+  private final int numMessages;
+  private final RecordMetadata[] metadata;
+  private final Exception[] errors;
+
+  private int sent = 0;
+
+
+  ResultRecorder(int numMessages)
+  {
+    this.numMessages = numMessages;
+    this.metadata = new RecordMetadata[numMessages];
+    this.errors = new Exception[numMessages];
+  }
+
+
+  void addSuccess(RecordMetadata m)
+  {
+    checkStatus();
+    metadata[sent++] = m;
+    processResult();
+  }
+
+  void addFailure(Exception e)
+  {
+    checkStatus();
+    errors[sent++] = e;
+    processResult();
+  }
+
+  private void checkStatus() throws IllegalStateException
+  {
+    if (sent >= numMessages)
+      throw new IllegalStateException("Already sent " + sent + " messages!");
+  }
+
+  private void processResult()
+  {
+    if (sent == numMessages)
+    {
+      if (Arrays
+          .stream(errors)
+          .filter(e -> e != null)
+          .findAny()
+          .isPresent())
+      {
+        deferredResult.setErrorResult(new ProduceFailure(errors));
+      }
+      else
+      {
+        Integer[] partitions = new Integer[numMessages];
+        Long[] offsets = new Long[numMessages];
+        for (int i = 0; i < numMessages; i++)
+        {
+          partitions[i] = metadata[i].partition();
+          offsets[i] = metadata[i].offset();
+        }
+        deferredResult.setResult(new ProduceSuccess(partitions, offsets));
+      }
+    }
+  }
+}
index 0d5752c..8ce4821 100644 (file)
@@ -4,7 +4,7 @@ producer:
   topic: test
   acks: -1
   batch-size: 16384
-  linger-ms: 0
+  linger-ms: 5
   compression-type: gzip
 management:
   endpoint:
index 646a335..b9c1e17 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.web.servlet.MockMvc;
@@ -53,11 +54,15 @@ public class ApplicationTests
        void testSendMessage() throws Exception
        {
                mockMvc
-                               .perform(post("/peter").content("Hallo Welt!"))
+                               .perform(
+                                               post("/peter")
+                                                               .header("X-id", 7)
+                                                               .contentType(MediaType.APPLICATION_JSON)
+                                                               .content("666"))
                                .andExpect(status().isOk());
                await("Message was send")
                                .atMost(Duration.ofSeconds(5))
-                               .until(() -> consumer.received.size() == 1);
+                               .until(() -> consumer.received.size() == 667);
        }