Confluent S3 Sink Connector aktiviert
authorKai Moritz <kai@juplo.de>
Tue, 23 Apr 2024 09:25:27 +0000 (11:25 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 27 Apr 2024 07:25:21 +0000 (09:25 +0200)
* Zugriffsrechte für minio explizit gesetzt.
* Der Service `setup` startet auch den Service `minio`.
* Der Connector liest das Topic `test`.
* Der Connector speichert die Daten in minio.

README.sh
docker/docker-compose.yml
s3-sink-connector.json

index 98a581d..9dfefa0 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -20,3 +20,46 @@ echo
 echo "Validität der Konfiguration des S3-Sink-Connector prüfen"
 echo
 cat s3-sink-connector.json | jq .config | http -v put :8083/connector-plugins/io.confluent.connect.s3.S3SinkConnector/config/validate
+
+echo
+echo "Schreibe Nachrichtem mit Avro-Schema in das Topic \"test\""
+echo
+docker compose -f docker/docker-compose.yml exec -T cli \
+  kafka-avro-console-producer \
+    --broker-list kafka:9092 \
+    --topic test \
+    --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' \
+    --property schema.registry.url=http://schema-registry:8085 << EOF
+{"f1":"foo"}
+{"f1":"bar"}
+{"f1":"foofoo"}
+{"f1":"barbar"}
+{"f1":"foobar"}
+{"f1":"barfoo"}
+EOF
+
+echo
+echo "Lese Nachrichtem mit Avro-Schema aus dem Topic \"test\""
+echo
+docker compose -f docker/docker-compose.yml exec cli \
+  kafka-avro-console-consumer \
+    --bootstrap-server kafka:9092 \
+    --topic test \
+    --property schema.registry.url=http://schema-registry:8085 \
+    --from-beginning \
+    --max-messages 5
+
+echo
+echo "Erzeuge einen Confluent S3 Sink Connector"
+echo
+cat s3-sink-connector.json | http -v post :8083/connectors
+
+echo
+echo "Schlafe für 10 Sekunden..."
+echo
+sleep 10
+
+echo
+echo "Liste die im S3-Bucket erzeugten Dateien auf"
+echo
+docker compose -f docker/docker-compose.yml exec minio mc find /data/juplo
index 36e14c8..5a5ee55 100644 (file)
@@ -108,6 +108,8 @@ services:
       CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
       CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
       CONNECT_PLUGIN_PATH: /usr/share/java/plugins/
+      AWS_ACCESS_KEY_ID: juplo
+      AWS_SECRET_ACCESS_KEY: strenggeheim
     ports:
       - 8083:8083
     volumes:
@@ -120,7 +122,9 @@ services:
     command: sleep infinity
     stop_grace_period: 0s
     depends_on:
-      - connect
+      - kafka-1
+      - kafka-2
+      - kafka-3
 
   setup:
     image: juplo/toolbox
@@ -152,6 +156,8 @@ services:
     stop_grace_period: 0s
     depends_on:
       - cli
+      - connect
+      - minio
 
   zoonavigator:
     image: elkozmon/zoonavigator:1.1.2
@@ -193,6 +199,9 @@ services:
       - 9001:9001
     volumes:
       - minio:/data
+    environment:
+      MINIO_ACCESS_KEY: juplo
+      MINIO_SECRET_KEY: strenggeheim
     command: server /data --console-address ":9001"
 
 volumes:
index 6cb9838..d85eba9 100644 (file)
@@ -3,12 +3,12 @@
   "config": {
     "name": "s3-sink",
     "connector.class": "io.confluent.connect.s3.S3SinkConnector",
-    "topics": "s3_topic",
-    "s3.region": "us-west-2",
-    "s3.bucket.name": "confluent-kafka-connect-s3-testing",
+    "topics": "test",
+    "s3.bucket.name": "juplo",
     "s3.part.size": "5242880",
     "flush.size": "3",
     "storage.class": "io.confluent.connect.s3.storage.S3Storage",
+    "store.url": "http://minio:9000",
     "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
     "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
     "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",