Merge der Überarbeitungen für die LVM-Schulung aus dem Branch 'customized' customized-vorlage customized--vorlage---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 20:06:07 +0000 (22:06 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 3 Aug 2022 20:06:07 +0000 (22:06 +0200)
README.sh
docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java [new file with mode: 0644]

index ece13d0..d0a59bd 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,7 +9,7 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -24,83 +24,43 @@ fi
 
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
-docker-compose up -d kafka-ui
-
-docker-compose exec -T cli bash << 'EOF'
-echo "Creating topic with 3 partitions..."
-kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-# tag::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 3
-# end::createtopic[]
-kafka-topics --bootstrap-server kafka:9092 --describe --topic test
-EOF
-
-docker-compose up -d producer-0 producer-1 consumer
-
-while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
-while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-while ! [[ $(http -b :8081/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8081/actuator/health; sleep 1; done
+docker-compose up setup
+docker-compose up -d producer-0 producer-1
+while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-0; sleep 1; done
+while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for producer-1; sleep 1; done
+docker-compose up -d consumer
 
 echo foo | http -v :8000/foo
 echo foo | http -v :8001/foo
+echo foo | http -v :8001/foo
+echo foo | http -v :8000/bar
+echo foobar | http -v :8000/bar
+echo foofoo | http -v :8000/bar
+echo barbar | http -v :8000/bar
+echo barfoo | http -v :8000/bar
+echo bar | http -v :8000/bar
 
-sleep 5
-
-http -v :8081/seen
+docker-compose logs consumer
 
 docker-compose up -d
-
-sleep 5
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
 
 docker-compose exec -T cli bash << 'EOF'
 echo "Altering number of partitions from 3 to 7..."
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 kafka-topics --bootstrap-server kafka:9092 --alter --topic test --partitions 7
 kafka-topics --bootstrap-server kafka:9092 --describe --topic test
 EOF
 
 docker-compose restart producer-0 producer-1
-
 while ! [[ $(http -b :8000/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8000/actuator/health; sleep 1; done
 while ! [[ $(http -b :8001/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8001/actuator/health; sleep 1; done
-
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-sleep 1
-http -v :8081/seen
-
-docker-compose stop
+docker-compose exec cli kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n' -q -c20
+
+echo "Messages from peter"
+docker-compose logs consumer | grep k=peter
+echo "Messages from beate"
+docker-compose logs consumer | grep k=beate
+echo "Messages from foo"
+docker-compose logs consumer | grep k=foo
index 6993f6c..11c5c8d 100644 (file)
@@ -1,36 +1,73 @@
 version: '3.2'
 services:
   zookeeper:
-    image: confluentinc/cp-zookeeper:7.0.2
+    image: confluentinc/cp-zookeeper:7.1.3
     environment:
       ZOOKEEPER_CLIENT_PORT: 2181
     ports:
       - 2181:2181
 
-  kafka:
-    image: confluentinc/cp-kafka:7.0.2
+  kafka-1:
+    image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9081:9081
+    depends_on:
+      - zookeeper
+
+  kafka-2:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 2
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     ports:
       - 9092:9082
       - 9082:9082
+    networks:
+      default:
+        aliases:
+          - kafka
     depends_on:
       - zookeeper
 
-  kafka-ui:
-    image: provectuslabs/kafka-ui:0.3.3
-    ports:
-      - 8080:8080
+  kafka-3:
+    image: confluentinc/cp-kafka:7.1.3
     environment:
-      KAFKA_CLUSTERS_0_NAME: local
-      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
+      KAFKA_BROKER_ID: 3
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9083:9083
+    depends_on:
+      - zookeeper
+
+  setup:
+    image: juplo/toolbox
+    command: >
+      bash -c "
+        kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic test
+      "
 
   cli:
     image: juplo/toolbox
@@ -41,6 +78,7 @@ services:
     ports:
       - 8000:8080
     environment:
+      server.port: 8080
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
@@ -51,6 +89,7 @@ services:
     ports:
       - 8001:8080
     environment:
+      server.port: 8080
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
@@ -59,6 +98,7 @@ services:
   peter:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
+      server.port: 8080
       rest-client.baseUrl: http://producer-1:8080
       rest-client.username: peter
       rest-client.throttle-ms: 1000
@@ -66,6 +106,7 @@ services:
   klaus:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
+      server.port: 8080
       rest-client.baseUrl: http://producer-1:8080
       rest-client.username: klaus
       rest-client.throttle-ms: 1100
@@ -73,6 +114,7 @@ services:
   beate:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
+      server.port: 8080
       rest-client.baseUrl: http://producer-0:8080
       rest-client.username: beate
       rest-client.throttle-ms: 900
@@ -80,6 +122,7 @@ services:
   franz:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
+      server.port: 8080
       rest-client.baseUrl: http://producer-1:8080
       rest-client.username: franz
       rest-client.throttle-ms: 800
@@ -87,16 +130,11 @@ services:
   uschi:
     image: juplo/rest-client:1.0-SNAPSHOT
     environment:
+      server.port: 8080
       rest-client.baseUrl: http://producer-0:8080
       rest-client.username: uschi
       rest-client.throttle-ms: 1200
 
   consumer:
-    image: juplo/counting-consumer:1.0-SNAPSHOT
-    ports:
-      - 8081:8081
-    environment:
-      consumer.bootstrap-server: kafka:9092
-      consumer.client-id: my-group
-      consumer.client-id: consumer
-      consumer.topic: test
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
diff --git a/pom.xml b/pom.xml
index 129ea94..e4d24bb 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -7,13 +7,14 @@
   <parent>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-parent</artifactId>
-    <version>2.6.5</version>
+    <version>2.7.2</version>
     <relativePath/> <!-- lookup parent from repository -->
   </parent>
 
   <groupId>de.juplo.kafka</groupId>
   <artifactId>rest-producer</artifactId>
-  <name>REST Producer: a Simple Producer that takes messages via POST and confirms successs</name>
+  <name>REST Producer</name>
+  <description>A Simple Producer that takes messages via POST and confirms successs</description>
   <version>1.0-SNAPSHOT</version>
 
   <dependencies>
       <artifactId>spring-boot-starter-test</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.springframework.kafka</groupId>
+      <artifactId>spring-kafka-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
       <plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>build-info</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>pl.project13.maven</groupId>
+        <artifactId>git-commit-id-plugin</artifactId>
       </plugin>
       <plugin>
         <groupId>io.fabric8</groupId>
index b89802b..2706313 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.context.request.async.DeferredResult;
 
@@ -47,6 +48,7 @@ public class RestProducer
   @PostMapping(path = "{key}")
   public DeferredResult<ProduceResult> send(
       @PathVariable String key,
+      @RequestHeader(name = "X-id", required = false) Long correlationId,
       @RequestBody String value)
   {
     DeferredResult<ProduceResult> result = new DeferredResult<>();
@@ -93,9 +95,8 @@ public class RestProducer
 
     long now = System.currentTimeMillis();
     log.trace(
-        "{} - Queued #{} key={} latency={}ms",
+        "{} - Queued message with key={} latency={}ms",
         id,
-        value,
         record.key(),
         now - time
     );
@@ -103,6 +104,13 @@ public class RestProducer
     return result;
   }
 
+  @ExceptionHandler
+  @ResponseStatus(HttpStatus.BAD_REQUEST)
+  public ErrorResponse illegalStateException(IllegalStateException e)
+  {
+    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
+  }
+
   @PreDestroy
   public void destroy() throws ExecutionException, InterruptedException
   {
index fcc0f3c..0d5752c 100644 (file)
@@ -1,17 +1,36 @@
 producer:
   bootstrap-server: :9092
-  client-id: peter
+  client-id: DEV
   topic: test
   acks: -1
   batch-size: 16384
   linger-ms: 0
   compression-type: gzip
 management:
+  endpoint:
+    shutdown:
+      enabled: true
   endpoints:
     web:
       exposure:
         include: "*"
+  info:
+    env:
+      enabled: true
+    java:
+      enabled: true
+info:
+  kafka:
+    bootstrap-server: ${producer.bootstrap-server}
+    client-id: ${producer.client-id}
+    topic: ${producer.topic}
+    acks: ${producer.acks}
+    batch-size: ${producer.batch-size}
+    linger-ms: ${producer.linger-ms}
+    compression-type: ${producer.compression-type}
 logging:
   level:
     root: INFO
-    de.juplo: DEBUG
+    de.juplo: TRACE
+server:
+  port: 8880
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
new file mode 100644 (file)
index 0000000..cf70c81
--- /dev/null
@@ -0,0 +1,86 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.awaitility.Awaitility.*;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
+import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.put;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "producer.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "producer.topic=" + TOPIC})
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
+public class ApplicationTests
+{
+       static final String TOPIC = "FOO";
+       static final int PARTITIONS = 10;
+
+       @Autowired
+       MockMvc mockMvc;
+       @Autowired
+       Consumer consumer;
+
+
+       @BeforeEach
+       public void clear()
+       {
+               consumer.received.clear();
+       }
+
+
+       @Test
+       void testSendMessage() throws Exception
+       {
+               mockMvc
+                               .perform(post("/peter").content("Hallo Welt!"))
+                               .andExpect(status().isOk());
+               await("Message was send")
+                               .atMost(Duration.ofSeconds(5))
+                               .until(() -> consumer.received.size() == 1);
+       }
+
+
+       static class Consumer
+       {
+               final List<ConsumerRecord<String, String>> received = new LinkedList<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC)
+               public void receive(ConsumerRecord<String, String> record)
+               {
+                       log.debug("Received message: {}", record);
+                       received.add(record);
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+       }
+}