]> juplo.de Git - demos/kafka/training/commitdiff
Das Topic wird über `spring.kafka.template.default-topic` konfiguriert
authorKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 09:33:28 +0000 (11:33 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 12 Jun 2022 13:53:19 +0000 (15:53 +0200)
docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/RestProducer.java
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java

index 287168c7297cd14977925a95fd81befcd3370409..5baa9596839e8de3c2e6a35c727c075a4c449302 100644 (file)
@@ -43,7 +43,7 @@ services:
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       producer.client-id: producer
-      producer.topic: test
+      spring.kafka.template.default-topic: test
 
   consumer:
     image: juplo/endless-consumer:1.0-SNAPSHOT
index 9f3e3ed3f15fa955e6f90e1d6675ce742d9f1c6c..273cee5a98285854b0163ce737ddf429f3d0e0a7 100644 (file)
@@ -1,13 +1,8 @@
 package de.juplo.kafka;
 
-import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.concurrent.Executors;
 
 
 @SpringBootApplication
index 56758f8151dc662c597387fbec599048ddb6aa56..423a8a3cdbec74975e33fb69c16b9d8b94d04b64 100644 (file)
@@ -16,7 +16,6 @@ import java.util.concurrent.ExecutionException;
 public class RestProducer
 {
   private final String id;
-  private final String topic;
   private final KafkaTemplate<String, Object> kafkaTemplate;
 
   private long produced = 0;
@@ -26,7 +25,6 @@ public class RestProducer
       KafkaTemplate<String, Object> kafkaTemplate)
   {
     this.id = properties.getClientId();
-    this.topic = properties.getTopic();
     this.kafkaTemplate = kafkaTemplate;
   }
 
@@ -60,7 +58,7 @@ public class RestProducer
 
     final long time = System.currentTimeMillis();
 
-    kafkaTemplate.send(topic, key, value).addCallback(
+    kafkaTemplate.sendDefault(key, value).addCallback(
       (sendResult) ->
       {
         long now = System.currentTimeMillis();
index f54576aab20b4d50f6564c65744f47dcdf5fe0cd..ce26258ed436a65861946fc904cba429a6f77a0d 100644 (file)
@@ -18,7 +18,7 @@ info:
   kafka:
     bootstrap-servers: ${spring.kafka.bootstrap-servers}
     client-id: ${producer.client-id}
-    topic: ${producer.topic}
+    topic: ${spring.kafka.template.default-topic}
     acks: ${spring.kafka.producer.acks}
     batch-size: ${spring.kafka.producer.batch-size}
     linger-ms: ${spring.kafka.producer.properties.linger.ms}
@@ -40,6 +40,8 @@ spring:
           message:de.juplo.kafka.ClientMessage,
           foo:de.juplo.kafka.FooMessage,
           greeting:de.juplo.kafka.Greeting
+    template:
+      default-topic: test
 logging:
   level:
     root: INFO
index 76cfe42fba38a1e2ec3ff5e48edf54ba3c5cb8e4..0a11e4491e9f2351056377d37b2c6256f5955b57 100644 (file)
@@ -27,7 +27,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
                properties = {
                                "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "producer.topic=" + TOPIC})
+                               "spring.kafka.template.default-topic=" + TOPIC})
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 @Slf4j