Das Topic wird über `spring.kafka.template.default-topic` konfiguriert
authorKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 09:33:28 +0000 (11:33 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 13:53:19 +0000 (15:53 +0200)
docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 287168c..5baa959 100644 (file)
@@ -43,7 +43,7 @@ services:
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       producer.client-id: producer
-      producer.topic: test
+      spring.kafka.template.default-topic: test
 
   consumer:
     image: juplo/endless-consumer:1.0-SNAPSHOT
index 9f3e3ed..273cee5 100644 (file)
@@ -1,13 +1,8 @@
 package de.juplo.kafka;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.concurrent.Executors;
 
 
 @SpringBootApplication
index 56758f8..423a8a3 100644 (file)
@@ -16,7 +16,6 @@ import java.util.concurrent.ExecutionException;
 public class RestProducer
 {
   private final String id;
-  private final String topic;
   private final KafkaTemplate<String, Object> kafkaTemplate;
 
   private long produced = 0;
@@ -26,7 +25,6 @@ public class RestProducer
       KafkaTemplate<String, Object> kafkaTemplate)
   {
     this.id = properties.getClientId();
-    this.topic = properties.getTopic();
     this.kafkaTemplate = kafkaTemplate;
   }
 
@@ -60,7 +58,7 @@ public class RestProducer
 
     final long time = System.currentTimeMillis();
 
-    kafkaTemplate.send(topic, key, value).addCallback(
+    kafkaTemplate.sendDefault(key, value).addCallback(
       (sendResult) ->
       {
         long now = System.currentTimeMillis();
index f54576a..ce26258 100644 (file)
@@ -18,7 +18,7 @@ info:
   kafka:
     bootstrap-servers: ${spring.kafka.bootstrap-servers}
     client-id: ${producer.client-id}
-    topic: ${producer.topic}
+    topic: ${spring.kafka.template.default-topic}
     acks: ${spring.kafka.producer.acks}
     batch-size: ${spring.kafka.producer.batch-size}
     linger-ms: ${spring.kafka.producer.properties.linger.ms}
@@ -40,6 +40,8 @@ spring:
           message:de.juplo.kafka.ClientMessage,
           foo:de.juplo.kafka.FooMessage,
           greeting:de.juplo.kafka.Greeting
+    template:
+      default-topic: test
 logging:
   level:
     root: INFO
index 76cfe42..0a11e44 100644 (file)
@@ -27,7 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
                properties = {
                                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "producer.topic=" + TOPIC})
+                               "spring.kafka.template.default-topic=" + TOPIC})
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j