From cc2ab7414801949c1b9ffb24b247dc7f474b1728 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 12 Nov 2024 19:01:01 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20`KafkaTemplate`-?= =?utf8?q?=C3=9Cbung?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 42 ------------------- .../java/de/juplo/kafka/ExampleProducer.java | 40 +----------------- 2 files changed, 1 insertion(+), 81 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 24811ed8..00000000 --- a/README.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/supersimple-producer:1.0-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf producer - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d peter ute -sleep 15 - -docker compose -f docker/docker-compose.yml stop producer - -echo -echo "Von peter empfangen:" -docker compose -f docker/docker-compose.yml logs peter | grep 'partition=test-.' -echo -echo "Von ute empfangen:" -docker compose -f docker/docker-compose.yml logs ute | grep 'partition=test-.' - -docker compose -f docker/docker-compose.yml stop peter ute diff --git a/src/main/java/de/juplo/kafka/ExampleProducer.java b/src/main/java/de/juplo/kafka/ExampleProducer.java index 7477157d..36dc4140 100644 --- a/src/main/java/de/juplo/kafka/ExampleProducer.java +++ b/src/main/java/de/juplo/kafka/ExampleProducer.java @@ -1,20 +1,13 @@ package de.juplo.kafka; -import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.SendResult; -import java.time.Duration; -import java.util.concurrent.CompletableFuture; - -@Slf4j -// tag::supersimple[] @SpringBootApplication public class ExampleProducer implements ApplicationRunner { @@ -26,37 +19,7 @@ public class ExampleProducer implements ApplicationRunner { for (int i = 0; true; i++) { - // end::supersimple[] - // tag::callback[] - CompletableFuture> completableFuture = - // tag::supersimple[] - kafkaTemplate.sendDefault(Long.toString(i%10), Long.toString(i)); - // end::supersimple[] - - completableFuture.thenAccept(result -> - log.info( - "Sent {}={} to partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset())); - - completableFuture.exceptionally(e -> { - log.error("ERROR sending message", e); - return null; - }); - // end::callback[] - // tag::supersimple[] - - try - { - Thread.sleep(Duration.ofMillis(500)); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } + kafkaTemplate.sendDefault(Long.toString(i%10), Long.toString(i)); } } @@ -65,4 +28,3 @@ public class ExampleProducer implements ApplicationRunner SpringApplication.run(ExampleProducer.class, args); } } -// end::supersimple[] -- 2.39.5