environment:
spring.kafka.bootstrap-servers: kafka:9092
producer.client-id: producer
- producer.topic: test
+ spring.kafka.template.default-topic: test
consumer:
image: juplo/endless-consumer:1.0-SNAPSHOT
package de.juplo.kafka;
-import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.concurrent.Executors;
@SpringBootApplication
public class RestProducer
{
private final String id;
- private final String topic;
private final KafkaTemplate<String, Object> kafkaTemplate;
private long produced = 0;
KafkaTemplate<String, Object> kafkaTemplate)
{
this.id = properties.getClientId();
- this.topic = properties.getTopic();
this.kafkaTemplate = kafkaTemplate;
}
final long time = System.currentTimeMillis();
- kafkaTemplate.send(topic, key, value).addCallback(
+ kafkaTemplate.sendDefault(key, value).addCallback(
(sendResult) ->
{
long now = System.currentTimeMillis();
kafka:
bootstrap-servers: ${spring.kafka.bootstrap-servers}
client-id: ${producer.client-id}
- topic: ${producer.topic}
+ topic: ${spring.kafka.template.default-topic}
acks: ${spring.kafka.producer.acks}
batch-size: ${spring.kafka.producer.batch-size}
linger-ms: ${spring.kafka.producer.properties.linger.ms}
message:de.juplo.kafka.ClientMessage,
foo:de.juplo.kafka.FooMessage,
greeting:de.juplo.kafka.Greeting
+ template:
+ default-topic: test
logging:
level:
root: INFO
@SpringBootTest(
properties = {
"spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
- "producer.topic=" + TOPIC})
+ "spring.kafka.template.default-topic=" + TOPIC})
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
@Slf4j