From: Kai Moritz Date: Sun, 12 Jun 2022 09:33:28 +0000 (+0200) Subject: Das Topic wird über `spring.kafka.template.default-topic` konfiguriert X-Git-Tag: bytestream-producer~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=56bf19f4f150e7ab97eed32f01a1f470b9f896a6;p=demos%2Fkafka%2Ftraining Das Topic wird über `spring.kafka.template.default-topic` konfiguriert --- diff --git a/docker-compose.yml b/docker-compose.yml index 287168c..5baa959 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,7 +43,7 @@ services: environment: spring.kafka.bootstrap-servers: kafka:9092 producer.client-id: producer - producer.topic: test + spring.kafka.template.default-topic: test consumer: image: juplo/endless-consumer:1.0-SNAPSHOT diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 9f3e3ed..273cee5 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -1,13 +1,8 @@ package de.juplo.kafka; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.concurrent.Executors; @SpringBootApplication diff --git a/src/main/java/de/juplo/kafka/RestProducer.java b/src/main/java/de/juplo/kafka/RestProducer.java index 56758f8..423a8a3 100644 --- a/src/main/java/de/juplo/kafka/RestProducer.java +++ b/src/main/java/de/juplo/kafka/RestProducer.java @@ -16,7 +16,6 @@ import java.util.concurrent.ExecutionException; public class RestProducer { private final String id; - private final String topic; private final KafkaTemplate kafkaTemplate; private long produced = 0; @@ -26,7 +25,6 @@ public class RestProducer KafkaTemplate kafkaTemplate) { this.id = properties.getClientId(); - this.topic = properties.getTopic(); this.kafkaTemplate = kafkaTemplate; } @@ -60,7 +58,7 @@ public class RestProducer final long time = System.currentTimeMillis(); - kafkaTemplate.send(topic, key, value).addCallback( + kafkaTemplate.sendDefault(key, value).addCallback( (sendResult) -> { long now = System.currentTimeMillis(); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index f54576a..ce26258 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -18,7 +18,7 @@ info: kafka: bootstrap-servers: ${spring.kafka.bootstrap-servers} client-id: ${producer.client-id} - topic: ${producer.topic} + topic: ${spring.kafka.template.default-topic} acks: ${spring.kafka.producer.acks} batch-size: ${spring.kafka.producer.batch-size} linger-ms: ${spring.kafka.producer.properties.linger.ms} @@ -40,6 +40,8 @@ spring: message:de.juplo.kafka.ClientMessage, foo:de.juplo.kafka.FooMessage, greeting:de.juplo.kafka.Greeting + template: + default-topic: test logging: level: root: INFO diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 76cfe42..0a11e44 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,7 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", - "producer.topic=" + TOPIC}) + "spring.kafka.template.default-topic=" + TOPIC}) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) @Slf4j