From be7ed549aa49eb52b4b975d114b0d71561d060f4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 23 Apr 2024 11:25:27 +0200 Subject: [PATCH] Confluent S3 Sink Connector aktiviert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Zugriffsrechte für minio explizit gesetzt. * Der Service `setup` startet auch den Service `minio`. * Der Connector liest das Topic `test`. * Der Connector speichert die Daten in minio. --- README.sh | 43 +++++++++++++++++++++++++++++++++++++++ docker/docker-compose.yml | 11 +++++++++- s3-sink-connector.json | 6 +++--- 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/README.sh b/README.sh index 98a581d..9dfefa0 100755 --- a/README.sh +++ b/README.sh @@ -20,3 +20,46 @@ echo echo "Validität der Konfiguration des S3-Sink-Connector prüfen" echo cat s3-sink-connector.json | jq .config | http -v put :8083/connector-plugins/io.confluent.connect.s3.S3SinkConnector/config/validate + +echo +echo "Schreibe Nachrichtem mit Avro-Schema in das Topic \"test\"" +echo +docker compose -f docker/docker-compose.yml exec -T cli \ + kafka-avro-console-producer \ + --broker-list kafka:9092 \ + --topic test \ + --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' \ + --property schema.registry.url=http://schema-registry:8085 << EOF +{"f1":"foo"} +{"f1":"bar"} +{"f1":"foofoo"} +{"f1":"barbar"} +{"f1":"foobar"} +{"f1":"barfoo"} +EOF + +echo +echo "Lese Nachrichtem mit Avro-Schema aus dem Topic \"test\"" +echo +docker compose -f docker/docker-compose.yml exec cli \ + kafka-avro-console-consumer \ + --bootstrap-server kafka:9092 \ + --topic test \ + --property schema.registry.url=http://schema-registry:8085 \ + --from-beginning \ + --max-messages 5 + +echo +echo "Erzeuge einen Confluent S3 Sink Connector" +echo +cat s3-sink-connector.json | http -v post :8083/connectors + +echo +echo "Schlafe für 10 Sekunden..." +echo +sleep 10 + +echo +echo "Liste die im S3-Bucket erzeugten Dateien auf" +echo +docker compose -f docker/docker-compose.yml exec minio mc find /data/juplo diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 36e14c8..5a5ee55 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -108,6 +108,8 @@ services: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_PLUGIN_PATH: /usr/share/java/plugins/ + AWS_ACCESS_KEY_ID: juplo + AWS_SECRET_ACCESS_KEY: strenggeheim ports: - 8083:8083 volumes: @@ -120,7 +122,9 @@ services: command: sleep infinity stop_grace_period: 0s depends_on: - - connect + - kafka-1 + - kafka-2 + - kafka-3 setup: image: juplo/toolbox @@ -152,6 +156,8 @@ services: stop_grace_period: 0s depends_on: - cli + - connect + - minio zoonavigator: image: elkozmon/zoonavigator:1.1.2 @@ -193,6 +199,9 @@ services: - 9001:9001 volumes: - minio:/data + environment: + MINIO_ACCESS_KEY: juplo + MINIO_SECRET_KEY: strenggeheim command: server /data --console-address ":9001" volumes: diff --git a/s3-sink-connector.json b/s3-sink-connector.json index 6cb9838..d85eba9 100644 --- a/s3-sink-connector.json +++ b/s3-sink-connector.json @@ -3,12 +3,12 @@ "config": { "name": "s3-sink", "connector.class": "io.confluent.connect.s3.S3SinkConnector", - "topics": "s3_topic", - "s3.region": "us-west-2", - "s3.bucket.name": "confluent-kafka-connect-s3-testing", + "topics": "test", + "s3.bucket.name": "juplo", "s3.part.size": "5242880", "flush.size": "3", "storage.class": "io.confluent.connect.s3.storage.S3Storage", + "store.url": "http://minio:9000", "format.class": "io.confluent.connect.s3.format.avro.AvroFormat", "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator", "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner", -- 2.20.1