`spring-producer` zu einem `rest-producer` erweitert producer/rest-producer producer/rest-producer--2024-11-13--si
authorKai Moritz <kai@juplo.de>
Sun, 29 Sep 2024 20:14:48 +0000 (22:14 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:20:03 +0000 (18:20 +0100)
12 files changed:
README.sh
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ErrorResponse.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ExampleProducer.java
src/main/java/de/juplo/kafka/ProduceFailure.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ProduceResult.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ProduceSuccess.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ApplicationTests.java

index 499780a..1e66cf0 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-producer:1.0-SNAPSHOT
+IMAGE=juplo/rest-producer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf producer
+docker compose -f docker/docker-compose.yml rm -svf producer producer-0 producer-1
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -27,16 +27,93 @@ docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1
 
 
 docker compose -f docker/docker-compose.yml up -d producer
-docker compose -f docker/docker-compose.yml up -d consumer-1 consumer-2
-sleep 15
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
-docker compose -f docker/docker-compose.yml stop producer
+# tag::hashed[]
+echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
+# end::hashed[]
+echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
+# tag::hashed[]
+echo -n Nachricht 1 an peter über producer | http -v :8080/peter
+# end::hashed[]
+echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 2 an peter über producer | http -v :8080/peter
+echo -n Nachricht 3 an peter über producer | http -v :8080/peter
 
+echo Nachrichten in Partition 0:
+kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
 echo
-echo "Von consumer-1 empfangen:"
-docker compose -f docker/docker-compose.yml logs consumer-1 | grep '\ test\/.'
+echo Nachrichten in Partition 1:
+kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
 echo
-echo "Von consumer-2 empfangen:"
-docker compose -f docker/docker-compose.yml logs consumer-2 | grep '\ test\/.'
 
-docker compose -f docker/docker-compose.yml stop consumer-1 consumer-2
+docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 2 to 3..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+# tag::repartitioning[]
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
+# end::repartitioning[]
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker compose -f docker/docker-compose.yml restart producer
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+
+echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 5 an peter über producer | http -v :8080/peter
+echo -n Nachricht 4 an peter über producer | http -v :8080/peter
+echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
+echo -n Nachricht 6 an peter über producer | http -v :8080/peter
+
+echo Nachrichten in Partition 0:
+# tag::kafkacat[]
+kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+# end::kafkacat[]
+echo
+echo Nachrichten in Partition 1:
+# tag::kafkacat[]
+kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+# end::kafkacat[]
+echo
+echo Nachrichten in Partition 2:
+kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+
+
+docker compose -f docker/docker-compose.yml restart setup
+sleep 1
+docker compose -f docker/docker-compose.yml up -d producer-0 producer-1
+while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
+while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
+
+# tag::fixed[]
+echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
+echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
+echo -n Nachricht 2 über producer-0 | http -v :8000/peter
+echo -n Nachricht 2 über producer-1 | http -v :8001/peter
+# end::fixed[]
+
+docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
+echo "Altering number of partitions from 2 to 3..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+EOF
+
+docker compose -f docker/docker-compose.yml restart producer-0 producer-1
+while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
+while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
+
+echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
+echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
+echo -n Nachricht 4 über producer-0 | http -v :8000/peter
+echo -n Nachricht 4 über producer-1 | http -v :8001/peter
+
+echo Nachrichten in Partition 0:
+kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+echo
+echo Nachrichten in Partition 1:
+kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+echo
+echo Nachrichten in Partition 2:
+kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
index c417a7f..b8d5596 100644 (file)
@@ -190,11 +190,36 @@ services:
       - kafka-3
 
   producer:
-    image: juplo/spring-producer:1.0-SNAPSHOT
+    image: juplo/rest-producer:1.0-SNAPSHOT
+    ports:
+      - 8080:8080
     environment:
-      juplo.bootstrap-server: kafka:9092
-      juplo.client-id: producer
-      juplo.producer.topic: test
+      server.port: 8080
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: producer
+      producer.topic: test
+
+  producer-0:
+    image: juplo/rest-producer:1.0-SNAPSHOT
+    ports:
+      - 8000:8080
+    environment:
+      server.port: 8080
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: producer-0
+      producer.topic: test
+      producer.partition: 0
+
+  producer-1:
+    image: juplo/rest-producer:1.0-SNAPSHOT
+    ports:
+      - 8001:8080
+    environment:
+      server.port: 8080
+      producer.bootstrap-server: kafka:9092
+      producer.client-id: producer-1
+      producer.topic: test
+      producer.partition: 1
 
   consumer-1:
     image: juplo/simple-consumer:1.0-SNAPSHOT
@@ -204,6 +229,10 @@ services:
     image: juplo/simple-consumer:1.0-SNAPSHOT
     command: kafka:9092 test my-group consumer-2
 
+  consumer-3:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: kafka:9092 test my-group consumer-3
+
 volumes:
   zookeeper-data:
   zookeeper-log:
diff --git a/pom.xml b/pom.xml
index 841299b..999c66b 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>spring-producer</artifactId>
-  <name>Spring Producer</name>
-  <description>A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs</description>
+  <artifactId>rest-producer</artifactId>
+  <name>REST Producer</name>
+  <description>A Simple Producer that takes messages via POST and confirms successs</description>
   <version>1.0-SNAPSHOT</version>
 
   <properties>
index 0069257..ba6aeee 100644 (file)
@@ -2,9 +2,12 @@ package de.juplo.kafka;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.web.bind.annotation.RestController;
 
 
 @SpringBootApplication
+@ComponentScan(excludeFilters = @ComponentScan.Filter(RestController.class))
 public class Application
 {
   public static void main(String[] args)
index 7540dd3..c3422af 100644 (file)
@@ -4,7 +4,6 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
@@ -19,18 +18,14 @@ public class ApplicationConfiguration
   @Bean
   public ExampleProducer exampleProducer(
       ApplicationProperties properties,
-      Producer<String, String> kafkaProducer,
-      ConfigurableApplicationContext applicationContext)
+      Producer<String, String> kafkaProducer)
   {
     return
         new ExampleProducer(
             properties.getClientId(),
             properties.getProducerProperties().getTopic(),
-            properties.getProducerProperties().getThrottle() == null
-              ? Duration.ofMillis(500)
-              : properties.getProducerProperties().getThrottle(),
-            kafkaProducer,
-            () -> applicationContext.close());
+            properties.getProducer().getPartition(),
+            kafkaProducer);
   }
 
   @Bean(destroyMethod = "")
index 4323262..00feb23 100644 (file)
@@ -41,6 +41,7 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String topic;
+    private Integer partition;
     @NotNull
     @NotEmpty
     private String acks;
@@ -57,6 +58,5 @@ public class ApplicationProperties
     @NotNull
     @NotEmpty
     private String compressionType;
-    private Duration throttle;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java
new file mode 100644 (file)
index 0000000..5ca206d
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import lombok.Value;
+
+
+@Value
+public class ErrorResponse
+{
+  private final String error;
+  private final Integer status;
+}
index bc5cf89..d34f189 100644 (file)
@@ -1,93 +1,53 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import java.math.BigInteger;
 
 import java.time.Duration;
 
 
 @Slf4j
-public class ExampleProducer implements Runnable
+@RestController
+@RequiredArgsConstructor
+public class ExampleProducer
 {
   private final String id;
   private final String topic;
-  private final Duration throttle;
+  private final Integer partition;
   private final Producer<String, String> producer;
-  private final Thread workerThread;
-  private final Runnable closeCallback;
 
-  private volatile boolean running = true;
   private long produced = 0;
 
-
-  public ExampleProducer(
-    String id,
-    String topic,
-    Duration throttle,
-    Producer<String, String> producer,
-    Runnable closeCallback)
+  @PostMapping(path = "{key}")
+  public DeferredResult<ProduceResult> send(
+      @PathVariable String key,
+      @RequestHeader(name = "X-id", required = false) Long correlationId,
+      @RequestBody String value)
   {
-    this.id = id;
-    this.topic = topic;
-    this.throttle = throttle;
-    this.producer = producer;
+    DeferredResult<ProduceResult> result = new DeferredResult<>();
 
-    workerThread = new Thread(this, "ExampleProducer Worker-Thread");
-    workerThread.start();
-
-    this.closeCallback = closeCallback;
-  }
-
-
-  @Override
-  public void run()
-  {
-    long i = 0;
-
-    try
-    {
-      for (; running; i++)
-      {
-        send(Long.toString(i%10), Long.toString(i));
-
-        if (throttle.isPositive())
-        {
-          try
-          {
-            Thread.sleep(throttle);
-          }
-          catch (InterruptedException e)
-          {
-            log.warn("{} - Interrupted while throttling!", e);
-          }
-        }
-      }
-    }
-    catch (Exception e)
-    {
-      log.error("{} - Unexpected error!", id, e);
-      log.info("{} - Triggering exit of application!", id);
-      new Thread(closeCallback).start();
-    }
-    finally
-    {
-      log.info("{}: Closing the KafkaProducer", id);
-      producer.close();
-      log.info("{}: Produced {} messages in total, exiting!", id, produced);
-    }
-  }
-
-  void send(String key, String value)
-  {
     final long time = System.currentTimeMillis();
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
         topic,  // Topic
+        partition, // Partition
         key,    // Key
         value   // Value
     );
 
+    record.headers().add("source", id.getBytes());
+    if (correlationId != null)
+    {
+      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
+    }
+
     producer.send(record, (metadata, e) ->
     {
       long now = System.currentTimeMillis();
@@ -95,6 +55,7 @@ public class ExampleProducer implements Runnable
       {
         // HANDLE SUCCESS
         produced++;
+        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent message {}={}, partition={}:{}, timestamp={}, latency={}ms",
             id,
@@ -109,6 +70,7 @@ public class ExampleProducer implements Runnable
       else
       {
         // HANDLE ERROR
+        result.setErrorResult(new ProduceFailure(e));
         log.error(
             "{} - ERROR for message {}={}, timestamp={}, latency={}ms: {}",
             id,
@@ -129,13 +91,14 @@ public class ExampleProducer implements Runnable
         record.value(),
         now - time
     );
-  }
 
+    return result;
+  }
 
-  public void shutdown() throws InterruptedException
+  @ExceptionHandler
+  @ResponseStatus(HttpStatus.BAD_REQUEST)
+  public ErrorResponse illegalStateException(IllegalStateException e)
   {
-    log.info("{} joining the worker-thread...", id);
-    running = false;
-    workerThread.join();
+    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
   }
 }
diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java
new file mode 100644 (file)
index 0000000..873a67b
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class ProduceFailure implements ProduceResult
+{
+  private final String error;
+  private final String exception;
+  private final Integer status;
+
+
+  public ProduceFailure(Exception e)
+  {
+    status = 500;
+    exception = e.getClass().getSimpleName();
+    error = e.getMessage();
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java
new file mode 100644 (file)
index 0000000..ceff329
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+
+
+@JsonInclude(NON_NULL)
+public interface ProduceResult
+{
+}
diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java
new file mode 100644 (file)
index 0000000..9c79e8b
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka;
+
+
+import lombok.Value;
+
+
+@Value
+public class ProduceSuccess implements ProduceResult
+{
+  Integer partition;
+  Long offset;
+}
index fe8609e..8b16f65 100644 (file)
@@ -21,6 +21,7 @@ import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -66,9 +67,12 @@ public class ApplicationTests
   @Test
   public void testSendMessage() throws Exception
   {
-    await("Some messages were send")
+    mockMvc
+      .perform(post("/peter").content("Hallo Welt!"))
+      .andExpect(status().isOk());
+    await("Message was send")
         .atMost(Duration.ofSeconds(5))
-        .until(() -> consumer.received.size() >= 1);
+        .until(() -> consumer.received.size() == 1);
   }