</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>recorder</artifactId>
- <version>1.0.2</version>
+ <version>1.1.0</version>
<name>Wordcount-Recorder</name>
<description>Recorder-service of the multi-user wordcount-example</description>
<properties>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.hibernate.validator</groupId>
public class RecorderApplication
{
@Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
+ KafkaProducer<String, Recording> producer(RecorderApplicationProperties properties)
{
Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
public class RecorderController
{
private final String topic;
- private final KafkaProducer<String, String> producer;
+ private final KafkaProducer<String, Recording> producer;
- public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
+ public RecorderController(
+ RecorderApplicationProperties properties,
+ KafkaProducer<String,Recording> producer)
{
this.topic = properties.getTopic();
this.producer = producer;
{
DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
+ ProducerRecord<String, Recording> record = new ProducerRecord<>(
+ topic,
+ username,
+ Recording.of(username, sentence));
+
producer.send(record, (metadata, exception) ->
{
if (metadata != null)