- Properties props = new Properties();
- props.put("bootstrap.servers", broker);
- props.put("client.id", clientId); // Nur zur Wiedererkennung
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
-
- producer = new KafkaProducer<>(props);
-
- this.topic = topic;
- this.id = clientId;
+ ListenableFuture<SendResult<String, String>> listenableFuture =
+ kafkaTemplate.send("test", key, value);
+
+ listenableFuture.addCallback(
+ result -> log.info(
+ "Sent {}={} to partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset()),
+ e -> log.error("ERROR sendig message", e));