From a65c9fb6924bc2d193920820b8d4bbb466039ca9 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Wed, 25 Sep 2024 23:31:07 +0200
Subject: [PATCH] =?utf8?q?`rest-producer`=20in=20einen=20`spring-producer`?=
 =?utf8?q?=20zur=C3=BCckgebaut?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

---
 README.sh                                     | 98 ++-----------------
 docker/docker-compose.yml                     | 39 +-------
 pom.xml                                       |  6 +-
 src/main/java/de/juplo/kafka/Application.java | 57 ++++++++++-
 .../juplo/kafka/ApplicationConfiguration.java |  1 -
 .../de/juplo/kafka/ApplicationProperties.java |  1 -
 .../java/de/juplo/kafka/ErrorResponse.java    | 11 ---
 .../java/de/juplo/kafka/ExampleProducer.java  | 58 +++++------
 .../java/de/juplo/kafka/ProduceFailure.java   | 21 ----
 .../java/de/juplo/kafka/ProduceResult.java    | 11 ---
 .../java/de/juplo/kafka/ProduceSuccess.java   | 12 ---
 .../java/de/juplo/kafka/ApplicationTests.java | 10 +-
 12 files changed, 97 insertions(+), 228 deletions(-)
 delete mode 100644 src/main/java/de/juplo/kafka/ErrorResponse.java
 delete mode 100644 src/main/java/de/juplo/kafka/ProduceFailure.java
 delete mode 100644 src/main/java/de/juplo/kafka/ProduceResult.java
 delete mode 100644 src/main/java/de/juplo/kafka/ProduceSuccess.java

diff --git a/README.sh b/README.sh
index d23ed676..9148486b 100755
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/rest-producer:1.0-SNAPSHOT
+IMAGE=juplo/spring-producer:1.0-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
@@ -10,7 +10,7 @@ then
 fi
 
 docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3
-docker compose -f docker/docker-compose.yml rm -svf producer producer-0 producer-1
+docker compose -f docker/docker-compose.yml rm -svf producer
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -29,94 +29,12 @@ docker compose -f docker/docker-compose.yml up -t0 -d cli
 sleep 1
 docker compose -f docker/docker-compose.yml logs setup
 
+docker compose -f docker/docker-compose.yml ps
 docker compose -f docker/docker-compose.yml up -d producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
+sleep 5
 
-# tag::hashed[]
-echo -n Nachricht 1 an klaus über producer | http -v :8080/klaus
-# end::hashed[]
-echo -n Nachricht 2 an klaus über producer | http -v :8080/klaus
-# tag::hashed[]
-echo -n Nachricht 1 an peter über producer | http -v :8080/peter
-# end::hashed[]
-echo -n Nachricht 3 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 2 an peter über producer | http -v :8080/peter
-echo -n Nachricht 3 an peter über producer | http -v :8080/peter
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -c 20 -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
 
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-
-docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-# tag::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-# end::repartitioning[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker compose -f docker/docker-compose.yml restart producer
-while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
-
-echo -n Nachricht 4 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 5 an peter über producer | http -v :8080/peter
-echo -n Nachricht 4 an peter über producer | http -v :8080/peter
-echo -n Nachricht 5 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an klaus über producer | http -v :8080/klaus
-echo -n Nachricht 6 an peter über producer | http -v :8080/peter
-
-echo Nachrichten in Partition 0:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 1:
-# tag::kafkacat[]
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-# end::kafkacat[]
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-
-
-docker compose -f docker/docker-compose.yml restart setup
-sleep 1
-docker compose -f docker/docker-compose.yml up -d producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-# tag::fixed[]
-echo -n Nachricht 1 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 1 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 2 über producer-0 | http -v :8000/peter
-echo -n Nachricht 2 über producer-1 | http -v :8001/peter
-# end::fixed[]
-
-docker compose -f docker/docker-compose.yml exec -T cli bash << 'EOF'
-echo "Altering number of partitions from 2 to 3..."
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 3
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker compose -f docker/docker-compose.yml restart producer-0 producer-1
-while ! [[ $(http 0:8000/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-0..."; sleep 1; done
-while ! [[ $(http 0:8001/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer-1..."; sleep 1; done
-
-echo -n Nachricht 3 über producer-0 | http -v :8000/klaus
-echo -n Nachricht 3 über producer-1 | http -v :8001/klaus
-echo -n Nachricht 4 über producer-0 | http -v :8000/peter
-echo -n Nachricht 4 über producer-1 | http -v :8001/peter
-
-echo Nachrichten in Partition 0:
-kafkacat -b :9092 -t test -o 0 -p0 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 1:
-kafkacat -b :9092 -t test -o 0 -p1 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
-echo
-echo Nachrichten in Partition 2:
-kafkacat -b :9092 -t test -o 0 -p2 -f'key: %k\toffset: %o\tvalue: %s\n' -qe
+docker compose -f docker/docker-compose.yml stop producer
+docker compose -f docker/docker-compose.yml exec cli kafkacat -b kafka:9092 -t test -e -f'topic=%t\tpartition=%p\toffset=%o\tkey=%k\tvalue=%s\n'
+docker compose -f docker/docker-compose.yml logs producer
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 2eaa6b6a..d03918da 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -188,49 +188,12 @@ services:
       - kafka-3
 
   producer:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8080:8080
+    image: juplo/simple-producer:1.0-SNAPSHOT
     environment:
-      server.port: 8080
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
 
-  producer-0:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8000:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer-0
-      producer.topic: test
-      producer.partition: 0
-
-  producer-1:
-    image: juplo/rest-producer:1.0-SNAPSHOT
-    ports:
-      - 8001:8080
-    environment:
-      server.port: 8080
-      producer.bootstrap-server: kafka:9092
-      producer.client-id: producer-1
-      producer.topic: test
-      producer.partition: 1
-
-  consumer-1:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-1
-
-  consumer-2:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-2
-
-  consumer-3:
-    image: juplo/simple-consumer:1.0-SNAPSHOT
-    command: kafka:9092 test my-group consumer-3
-
 volumes:
   zookeeper-data:
   zookeeper-log:
diff --git a/pom.xml b/pom.xml
index 999c66bc..841299b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -12,9 +12,9 @@
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
-  <artifactId>rest-producer</artifactId>
-  <name>REST Producer</name>
-  <description>A Simple Producer that takes messages via POST and confirms successs</description>
+  <artifactId>spring-producer</artifactId>
+  <name>Spring Producer</name>
+  <description>A Simple Spring-Boot-Producer, that takes messages via POST and confirms successs</description>
   <version>1.0-SNAPSHOT</version>
 
   <properties>
diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java
index ba6aeeed..d2699457 100644
--- a/src/main/java/de/juplo/kafka/Application.java
+++ b/src/main/java/de/juplo/kafka/Application.java
@@ -1,15 +1,64 @@
 package de.juplo.kafka;
 
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.context.annotation.ComponentScan;
-import org.springframework.web.bind.annotation.RestController;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.concurrent.ExecutionException;
 
 
 @SpringBootApplication
-@ComponentScan(excludeFilters = @ComponentScan.Filter(RestController.class))
-public class Application
+@Slf4j
+public class Application implements ApplicationRunner
 {
+  @Autowired
+  ThreadPoolTaskExecutor taskExecutor;
+  @Autowired
+  Producer<?, ?> kafkaProducer;
+  @Autowired
+  ExampleProducer exampleProducer;
+  @Autowired
+  ConfigurableApplicationContext context;
+
+  ListenableFuture<Integer> consumerJob;
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception
+  {
+    log.info("Starting SimpleConsumer");
+    consumerJob = taskExecutor.submitListenable(exampleProducer);
+    consumerJob.addCallback(
+      exitStatus ->
+      {
+        log.info("SimpleConsumer exited normally, exit-status: {}", exitStatus);
+        SpringApplication.exit(context, () -> exitStatus);
+        },
+      t ->
+      {
+        log.error("SimpleConsumer exited abnormally!", t);
+        SpringApplication.exit(context, () -> 2);
+      });
+  }
+
+  @PreDestroy
+  public void shutdown() throws ExecutionException, InterruptedException
+  {
+    log.info("Signaling ExampleProducer to quit its work");
+    exampleProducer.shutdown();
+    log.info("Waiting for ExampleProducer to finish its work");
+    consumerJob.get();
+    log.info("ExampleProducer finished its work");
+  }
+
+
   public static void main(String[] args)
   {
     SpringApplication.run(Application.class, args);
diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index 9ffe585b..5211b126 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -22,7 +22,6 @@ public class ApplicationConfiguration
         new ExampleProducer(
             properties.getClientId(),
             properties.getTopic(),
-            properties.getPartition(),
             kafkaProducer);
   }
 
diff --git a/src/main/java/de/juplo/kafka/ApplicationProperties.java b/src/main/java/de/juplo/kafka/ApplicationProperties.java
index 8efacd4d..4bf66a83 100644
--- a/src/main/java/de/juplo/kafka/ApplicationProperties.java
+++ b/src/main/java/de/juplo/kafka/ApplicationProperties.java
@@ -22,7 +22,6 @@ public class ApplicationProperties
   @NotNull
   @NotEmpty
   private String topic;
-  private Integer partition;
   @NotNull
   @NotEmpty
   private String acks;
diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java
deleted file mode 100644
index 5ca206d6..00000000
--- a/src/main/java/de/juplo/kafka/ErrorResponse.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.Value;
-
-
-@Value
-public class ErrorResponse
-{
-  private final String error;
-  private final Integer status;
-}
diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java
index 94608f93..ccfa4ce9 100644
--- a/src/main/java/de/juplo/kafka/ExampleProducer.java
+++ b/src/main/java/de/juplo/kafka/ExampleProducer.java
@@ -4,48 +4,55 @@ import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.web.bind.annotation.*;
-import org.springframework.web.context.request.async.DeferredResult;
 
-import java.math.BigInteger;
+import java.util.concurrent.Callable;
 
 
 @Slf4j
-@RestController
 @RequiredArgsConstructor
-public class ExampleProducer
+public class ExampleProducer implements Callable<Integer>
 {
   private final String id;
   private final String topic;
-  private final Integer partition;
   private final Producer<String, String> producer;
 
+  private volatile boolean running = true;
   private long produced = 0;
 
-  @PostMapping(path = "{key}")
-  public DeferredResult<ProduceResult> send(
-      @PathVariable String key,
-      @RequestHeader(name = "X-id", required = false) Long correlationId,
-      @RequestBody String value)
+
+  @Override
+  public Integer call()
   {
-    DeferredResult<ProduceResult> result = new DeferredResult<>();
+    long i = 0;
+
+    try
+    {
+      for (; running; i++)
+      {
+        send(Long.toString(i%10), Long.toString(i));
+        Thread.sleep(500);
+      }
+    }
+    catch (Exception e)
+    {
+      log.error("{} - Unexpected error: {}! Produced {} messages", id, e.toString(), produced);
+      return 1;
+    }
 
+    log.info("{}: Produced {} messages in total, exiting!", id, produced);
+    return 0;
+  }
+
+  void send(String key, String value)
+  {
     final long time = System.currentTimeMillis();
 
     final ProducerRecord<String, String> record = new ProducerRecord<>(
         topic,  // Topic
-        partition, // Partition
         key,    // Key
         value   // Value
     );
 
-    record.headers().add("source", id.getBytes());
-    if (correlationId != null)
-    {
-      record.headers().add("id", BigInteger.valueOf(correlationId).toByteArray());
-    }
-
     producer.send(record, (metadata, e) ->
     {
       long now = System.currentTimeMillis();
@@ -53,7 +60,6 @@ public class ExampleProducer
       {
         // HANDLE SUCCESS
         produced++;
-        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
         log.debug(
             "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
             id,
@@ -68,7 +74,6 @@ public class ExampleProducer
       else
       {
         // HANDLE ERROR
-        result.setErrorResult(new ProduceFailure(e));
         log.error(
             "{} - ERROR key={} timestamp={} latency={}ms: {}",
             id,
@@ -87,14 +92,11 @@ public class ExampleProducer
         record.key(),
         now - time
     );
-
-    return result;
   }
 
-  @ExceptionHandler
-  @ResponseStatus(HttpStatus.BAD_REQUEST)
-  public ErrorResponse illegalStateException(IllegalStateException e)
+
+  public void shutdown()
   {
-    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+    running = false;
   }
 }
diff --git a/src/main/java/de/juplo/kafka/ProduceFailure.java b/src/main/java/de/juplo/kafka/ProduceFailure.java
deleted file mode 100644
index 873a67b4..00000000
--- a/src/main/java/de/juplo/kafka/ProduceFailure.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package de.juplo.kafka;
-
-
-import lombok.Value;
-
-
-@Value
-public class ProduceFailure implements ProduceResult
-{
-  private final String error;
-  private final String exception;
-  private final Integer status;
-
-
-  public ProduceFailure(Exception e)
-  {
-    status = 500;
-    exception = e.getClass().getSimpleName();
-    error = e.getMessage();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ProduceResult.java b/src/main/java/de/juplo/kafka/ProduceResult.java
deleted file mode 100644
index ceff3299..00000000
--- a/src/main/java/de/juplo/kafka/ProduceResult.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-
-
-@JsonInclude(NON_NULL)
-public interface ProduceResult
-{
-}
diff --git a/src/main/java/de/juplo/kafka/ProduceSuccess.java b/src/main/java/de/juplo/kafka/ProduceSuccess.java
deleted file mode 100644
index 9c79e8b4..00000000
--- a/src/main/java/de/juplo/kafka/ProduceSuccess.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package de.juplo.kafka;
-
-
-import lombok.Value;
-
-
-@Value
-public class ProduceSuccess implements ProduceResult
-{
-  Integer partition;
-  Long offset;
-}
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
index 50bc20b1..8d579e95 100644
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ b/src/test/java/de/juplo/kafka/ApplicationTests.java
@@ -27,7 +27,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 		properties = {
 				"spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
 				"producer.bootstrap-server=${spring.embedded.kafka.brokers}",
-				"spring.kafka.consumer.auto-offset-reset=earliest",
 				"producer.topic=" + TOPIC})
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@@ -37,8 +36,6 @@ public class ApplicationTests
 	static final String TOPIC = "FOO";
 	static final int PARTITIONS = 10;
 
-	@Autowired
-	MockMvc mockMvc;
 	@Autowired
 	Consumer consumer;
 
@@ -53,12 +50,9 @@ public class ApplicationTests
 	@Test
 	void testSendMessage() throws Exception
 	{
-		mockMvc
-				.perform(post("/peter").content("Hallo Welt!"))
-				.andExpect(status().isOk());
-		await("Message was send")
+		await("Some messages were send")
 				.atMost(Duration.ofSeconds(5))
-				.until(() -> consumer.received.size() == 1);
+				.until(() -> consumer.received.size() >= 1);
 	}
 
 
-- 
2.20.1