From 56bf19f4f150e7ab97eed32f01a1f470b9f896a6 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 12 Jun 2022 11:33:28 +0200 Subject: [PATCH] =?utf8?q?Das=20Topic=20wird=20=C3=BCber=20`spring.kafka.t?= =?utf8?q?emplate.default-topic`=20konfiguriert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- docker-compose.yml | 2 +- src/main/java/de/juplo/kafka/Application.java | 5 ----- src/main/java/de/juplo/kafka/RestProducer.java | 4 +--- src/main/resources/application.yml | 4 +++- src/test/java/de/juplo/kafka/ApplicationTests.java | 2 +- 5 files changed, 6 insertions(+), 11 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 287168c..5baa959 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: environment: spring.kafka.bootstrap-servers: kafka:9092 producer.client-id: producer - producer.topic: test + spring.kafka.template.default-topic: test consumer: image: juplo/endless-consumer:1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 9f3e3ed..273cee5 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,13 +1,8 @@ package de.juplo.kafka; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.concurrent.Executors; @SpringBootApplication diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 56758f8..423a8a3 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -16,7 +16,6 @@ import java.util.concurrent.ExecutionException; public class RestProducer { private final String id; - private final String topic; private final KafkaTemplate kafkaTemplate; private long produced = 0; @@ -26,7 +25,6 @@ public class RestProducer KafkaTemplate kafkaTemplate) { this.id = properties.getClientId(); - this.topic = properties.getTopic(); this.kafkaTemplate = kafkaTemplate; } @@ -60,7 +58,7 @@ public class RestProducer final long time = System.currentTimeMillis(); - kafkaTemplate.send(topic, key, value).addCallback( + kafkaTemplate.sendDefault(key, value).addCallback( (sendResult) -> { long now = System.currentTimeMillis(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f54576a..ce26258 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -18,7 +18,7 @@ info: kafka: bootstrap-servers: ${spring.kafka.bootstrap-servers} client-id: ${producer.client-id} - topic: ${producer.topic} + topic: ${spring.kafka.template.default-topic} acks: ${spring.kafka.producer.acks} batch-size: ${spring.kafka.producer.batch-size} linger-ms: ${spring.kafka.producer.properties.linger.ms} @@ -40,6 +40,8 @@ spring: message:de.juplo.kafka.ClientMessage, foo:de.juplo.kafka.FooMessage, greeting:de.juplo.kafka.Greeting + template: + default-topic: test logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 76cfe42..0a11e44 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,7 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) + "spring.kafka.template.default-topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j -- 2.20.1