Merge branch 'rest-producer' into rest-producer-vorlage rest-producer--vorlage---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Sat, 24 Sep 2022 17:56:15 +0000 (19:56 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 24 Sep 2022 17:56:15 +0000 (19:56 +0200)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/RestProducer.java
src/test/java/de/juplo/kafka/ApplicationTests.java

index d2dccf8..3c098a7 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -10,13 +10,14 @@ then
 fi
 
 docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
+docker-compose rm -svf producer
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
   "$1" = "build"
 ]]
 then
-  mvn install || exit
+  mvn clean install || exit
 else
   echo "Using image existing images:"
   docker image ls $IMAGE
@@ -25,7 +26,7 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d
+docker-compose up -d producer consumer-1
 
 while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
@@ -59,4 +60,4 @@ echo foofoo | http -v :8080/bar
 echo barbar | http -v :8080/foo
 
 docker-compose logs producer
-docker-compose logs consumer
+docker-compose logs consumer-1
index 7ae8d9b..78ca5dd 100644 (file)
@@ -83,6 +83,46 @@ services:
       producer.client-id: producer
       producer.topic: test
 
-  consumer:
+  peter:
     image: juplo/toolbox
-    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo foo | http -v producer:8080/peter;
+        sleep 1;
+      done
+      "
+
+  klaus:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+      while [[ true ]];
+      do
+        echo bar | http -v producer:8080/klaus;
+        sleep 1;
+      done
+      "
+
+  consumer-1:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: consumer-1
+
+  consumer-2:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: consumer-2
+
+  consumer-3:
+    image: juplo/simple-consumer:1.0-SNAPSHOT
+    command: consumer-3
+
+  spikzettel:
+    image: juplo/toolbox
+    command: >
+      bash -c '
+        kafka-console-consumer \
+          --bootstrap-server kafka:9092 \
+          --topic __consumer_offsets --from-beginning \
+          --formatter "kafka.coordinator.group.GroupMetadataManager\$$OffsetsMessageFormatter"
+      '
diff --git a/pom.xml b/pom.xml
index e4d24bb..e7ea677 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <artifactId>spring-boot-configuration-processor</artifactId>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-validation</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
index 9f3e3ed..0069257 100644 (file)
@@ -1,17 +1,10 @@
 package de.juplo.kafka;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.concurrent.Executors;
 
 
 @SpringBootApplication
-@EnableConfigurationProperties(ApplicationProperties.class)
 public class Application
 {
   public static void main(String[] args)
diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
new file mode 100644 (file)
index 0000000..0642aa4
--- /dev/null
@@ -0,0 +1,46 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.Properties;
+
+
+@Configuration
+@EnableConfigurationProperties(ApplicationProperties.class)
+public class ApplicationConfiguration
+{
+  @Bean
+  public RestProducer restProducer(
+      ApplicationProperties properties,
+      KafkaProducer<String, String> kafkaProducer)
+  {
+    return
+        new RestProducer(
+            properties.getClientId(),
+            properties.getTopic(),
+            properties.getPartition(),
+            kafkaProducer);
+  }
+
+  @Bean(destroyMethod = "close")
+  public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+  {
+    Properties props = new Properties();
+    props.put("bootstrap.servers", properties.getBootstrapServer());
+    props.put("client.id", properties.getClientId());
+    props.put("acks", properties.getAcks());
+    props.put("batch.size", properties.getBatchSize());
+    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
+    props.put("request.timeout.ms",  10000); // 10 Sekunden
+    props.put("linger.ms", properties.getLingerMs());
+    props.put("compression.type", properties.getCompressionType());
+    props.put("key.serializer", StringSerializer.class.getName());
+    props.put("value.serializer", StringSerializer.class.getName());
+
+    return new KafkaProducer<>(props);
+  }
+}
index c74f588..673613a 100644 (file)
@@ -4,17 +4,33 @@ import lombok.Getter;
 import lombok.Setter;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
+
 @ConfigurationProperties(prefix = "producer")
 @Getter
 @Setter
 public class ApplicationProperties
 {
+  @NotNull
+  @NotEmpty
   private String bootstrapServer;
+  @NotNull
+  @NotEmpty
   private String clientId;
+  @NotNull
+  @NotEmpty
   private String topic;
   private Integer partition;
+  @NotNull
+  @NotEmpty
   private String acks;
+  @NotNull
   private Integer batchSize;
+  @NotNull
   private Integer lingerMs;
+  @NotNull
+  @NotEmpty
   private String compressionType;
 }
index de08ce2..009a1f2 100644 (file)
@@ -1,20 +1,21 @@
 package de.juplo.kafka;
 
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.annotation.PreDestroy;
-import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 
 
 @Slf4j
-@RestController
+@RequestMapping
+@ResponseBody
+@RequiredArgsConstructor
 public class RestProducer
 {
   private final String id;
@@ -24,27 +25,6 @@ public class RestProducer
 
   private long produced = 0;
 
-  public RestProducer(ApplicationProperties properties)
-  {
-    this.id = properties.getClientId();
-    this.topic = properties.getTopic();
-    this.partition = properties.getPartition();
-
-    Properties props = new Properties();
-    props.put("bootstrap.servers", properties.getBootstrapServer());
-    props.put("client.id", properties.getClientId());
-    props.put("acks", properties.getAcks());
-    props.put("batch.size", properties.getBatchSize());
-    props.put("delivery.timeout.ms", 20000); // 20 Sekunden
-    props.put("request.timeout.ms",  10000); // 10 Sekunden
-    props.put("linger.ms", properties.getLingerMs());
-    props.put("compression.type", properties.getCompressionType());
-    props.put("key.serializer", StringSerializer.class.getName());
-    props.put("value.serializer", StringSerializer.class.getName());
-
-    this.producer = new KafkaProducer<>(props);
-  }
-
   @PostMapping(path = "{key}")
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
@@ -75,13 +55,4 @@ public class RestProducer
   {
     return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
   }
-
-  @PreDestroy
-  public void destroy() throws ExecutionException, InterruptedException
-  {
-    log.info("{} - Destroy!", id);
-    log.info("{} - Closing the KafkaProducer", id);
-    producer.close();
-    log.info("{}: Produced {} messages in total, exiting!", id, produced);
-  }
 }
index cf70c81..646a335 100644 (file)
@@ -20,7 +20,6 @@ import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
 import static org.awaitility.Awaitility.*;
 import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;