Merge der überarbeiteten Compose-Konfiguration (Branch 'headers') headers-vorlage headers--vorlage---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Sun, 7 Aug 2022 14:01:43 +0000 (16:01 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 7 Aug 2022 14:01:43 +0000 (16:01 +0200)
README.sh
docker-compose.yml
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml

index 4f7f017..94a853a 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -9,7 +9,7 @@ then
   exit
 fi
 
-docker-compose up -d zookeeper kafka cli
+docker-compose up -d zookeeper kafka-1 kafka-2 kafka-3 cli
 
 if [[
   $(docker image ls -q $IMAGE) == "" ||
@@ -25,8 +25,9 @@ fi
 echo "Waiting for the Kafka-Cluster to become ready..."
 docker-compose exec cli cub kafka-ready -b kafka:9092 1 60 > /dev/null 2>&1 || exit 1
 docker-compose up setup
-docker-compose up -d
-while ! [[ $(http -b :8080/actuator/health | jq -r .status) =~ "UP" ]]; do echo Waiting for :8080/actuator/health; sleep 1; done
+docker-compose up -d producer
+
+while ! [[ $(http 0:8080/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for producer..."; sleep 1; done
 
 # tag::http[]
 echo -n bar | http -v :8080/foo
index c4b6c78..9265c13 100644 (file)
@@ -7,20 +7,56 @@ services:
     ports:
       - 2181:2181
 
-  kafka:
+  kafka-1:
     image: confluentinc/cp-kafka:7.1.3
     environment:
       KAFKA_BROKER_ID: 1
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9081
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-1:9092, LOCALHOST://localhost:9081
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9081:9081
+    depends_on:
+      - zookeeper
+
+  kafka-2:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 2
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
       KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9082
-      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka:9092, LOCALHOST://localhost:9082
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-2:9092, LOCALHOST://localhost:9082
       KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
-      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
       KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
     ports:
       - 9092:9082
       - 9082:9082
+    networks:
+      default:
+        aliases:
+          - kafka
+    depends_on:
+      - zookeeper
+
+  kafka-3:
+    image: confluentinc/cp-kafka:7.1.3
+    environment:
+      KAFKA_BROKER_ID: 3
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_LISTENERS: DOCKER://:9092, LOCALHOST://:9083
+      KAFKA_ADVERTISED_LISTENERS: DOCKER://kafka-3:9092, LOCALHOST://localhost:9083
+      KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT, LOCALHOST:PLAINTEXT
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    ports:
+      - 9083:9083
     depends_on:
       - zookeeper
 
@@ -29,7 +65,8 @@ services:
     command: >
       bash -c "
         kafka-topics --bootstrap-server kafka:9092 --delete --if-exists --topic test
-        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2
+        kafka-topics --bootstrap-server kafka:9092 --create --topic test --partitions 2 --replication-factor 3 --config min.insync.replicas=2
+        kafka-topics --bootstrap-server kafka:9092 --describe --topic test
       "
 
   cli:
@@ -45,3 +82,47 @@ services:
       producer.bootstrap-server: kafka:9092
       producer.client-id: producer
       producer.topic: test
+
+  peter:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer:8080
+      rest-client.username: peter
+      rest-client.throttle-ms: 1000
+
+  klaus:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer:8080
+      rest-client.username: klaus
+      rest-client.throttle-ms: 1100
+
+  beate:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer:8080
+      rest-client.username: beate
+      rest-client.throttle-ms: 900
+
+  franz:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer:8080
+      rest-client.username: franz
+      rest-client.throttle-ms: 800
+
+  uschi:
+    image: juplo/rest-client:1.0-SNAPSHOT
+    environment:
+      server.port: 8080
+      rest-client.baseUrl: http://producer:8080
+      rest-client.username: uschi
+      rest-client.throttle-ms: 1200
+
+  consumer:
+    image: juplo/toolbox
+    command: kafkacat -C -b kafka:9092 -t test -o 0 -f'p=%p|o=%o|k=%k|v=%s\n'
index 4c04fd8..2f2a1cb 100644 (file)
@@ -54,8 +54,58 @@ public class RestProducer
   {
     DeferredResult<ProduceResult> result = new DeferredResult<>();
 
-    // TODO: Ergänzen Sie die Logik Ihres REST-Producers und
-    //       ergänzen sie die versendten Nachrichten um die Header
+    final long time = System.currentTimeMillis();
+
+    final ProducerRecord<String, String> record = new ProducerRecord<>(
+        topic,  // Topic
+        partition, // Partition
+        key,    // Key
+        value   // Value
+    );
+
+    // TODO: Fügen Sie die Header zu der Nachricht hinzu
+
+    producer.send(record, (metadata, e) ->
+    {
+      long now = System.currentTimeMillis();
+      if (e == null)
+      {
+        // HANDLE SUCCESS
+        produced++;
+        result.setResult(new ProduceSuccess(metadata.partition(), metadata.offset()));
+        log.debug(
+            "{} - Sent key={} message={} partition={}/{} timestamp={} latency={}ms",
+            id,
+            record.key(),
+            record.value(),
+            metadata.partition(),
+            metadata.offset(),
+            metadata.timestamp(),
+            now - time
+        );
+      }
+      else
+      {
+        // HANDLE ERROR
+        result.setErrorResult(new ProduceFailure(e));
+        log.error(
+            "{} - ERROR key={} timestamp={} latency={}ms: {}",
+            id,
+            record.key(),
+            metadata == null ? -1 : metadata.timestamp(),
+            now - time,
+            e.toString()
+        );
+      }
+    });
+
+    long now = System.currentTimeMillis();
+    log.trace(
+        "{} - Queued message with key={} latency={}ms",
+        id,
+        record.key(),
+        now - time
+    );
 
     return result;
   }
index 726204e..0d5752c 100644 (file)
@@ -31,6 +31,6 @@ info:
 logging:
   level:
     root: INFO
-    de.juplo: DEBUG
+    de.juplo: TRACE
 server:
   port: 8880