--- /dev/null
+*
+!target/*.jar
--- /dev/null
+FROM openjdk:11-jre-slim
+COPY target/*.jar /opt/app.jar
+EXPOSE 8080
+ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
+CMD []
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>recorder</artifactId>
- <version>0.0.1-SNAPSHOT</version>
+ <version>1.0.0</version>
<name>Wordcount-Recorder</name>
<description>Recorder-service of the multi-user wordcount-example</description>
<properties>
+ <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
<java.version>11</java.version>
+ <kafka.version>2.8.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.hibernate.validator</groupId>
+ <artifactId>hibernate-validator</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>${docker-maven-plugin.version}</version>
+ <configuration>
+ <images>
+ <image>
+ <name>juplo/wordcount--%a:%v</name>
+ </image>
+ </images>
+ </configuration>
+ </plugin>
</plugins>
</build>
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.Properties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(RecorderApplicationProperties.class)
+public class RecorderApplication
+{
+ @Bean(destroyMethod = "close")
+ KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaProducer<>(props);
+ }
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(RecorderApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.recorder")
+@Getter
+@Setter
+@ToString
+public class RecorderApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String topic = "recordings";
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+import javax.validation.constraints.NotEmpty;
+
+
+@RestController
+public class RecorderController
+{
+ private final String topic;
+ private final KafkaProducer<String, String> producer;
+
+
+ public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
+ {
+ this.topic = properties.getTopic();
+ this.producer = producer;
+ }
+
+ @PostMapping(
+ path = "/{username}",
+ consumes = {
+ MimeTypeUtils.TEXT_PLAIN_VALUE,
+ MimeTypeUtils.APPLICATION_JSON_VALUE
+ },
+ produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
+ DeferredResult<ResponseEntity<RecordingResult>> speak(
+ @PathVariable
+ @NotEmpty(message = "A username must be provided")
+ String username,
+ @RequestBody
+ @NotEmpty(message = "The spoken sentence must not be empty!")
+ String sentence)
+ {
+ DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
+
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
+ producer.send(record, (metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ result.setResult(
+ ResponseEntity.ok(RecordingResult.of(
+ username,
+ sentence,
+ topic,
+ metadata.partition(),
+ metadata.offset(),
+ null,
+ null)));
+ }
+ else
+ {
+ result.setErrorResult(
+ ResponseEntity
+ .internalServerError()
+ .body(RecordingResult.of(
+ username,
+ sentence,
+ topic,
+ null,
+ null,
+ HttpStatus.INTERNAL_SERVER_ERROR.value(),
+ exception.toString())));
+ }
+ });
+
+ return result;
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Value;
+
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+
+
+@Value(staticConstructor = "of")
+public class RecordingResult
+{
+ @JsonInclude(NON_NULL) private final String username;
+ @JsonInclude(NON_NULL) private final String sentence;
+ @JsonInclude(NON_NULL) private final String topic;
+ @JsonInclude(NON_NULL) private final Integer partition;
+ @JsonInclude(NON_NULL) private final Long offset;
+ @JsonInclude(NON_NULL) private final Integer status;
+ @JsonInclude(NON_NULL) private final String error;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-@SpringBootApplication
-public class WordcountRecorderApplication
-{
- public static void main(String[] args)
- {
- SpringApplication.run(WordcountRecorderApplication.class, args);
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.recorder;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ApplicationTests
+{
+ @Test
+ void contextLoads()
+ {
+ }
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class WordcountRecorderApplicationTests
-{
- @Test
- void contextLoads()
- {
- }
-}