<name>Wordcount-Users</name>
<description>Users-service of the multi-user wordcount-example</description>
<properties>
+ <avro.version>1.10.2</avro.version>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
<java.version>11</java.version>
<kafka.version>2.8.0</kafka.version>
+ <confluent.version>6.2.1</confluent.version>
</properties>
<dependencies>
<dependency>
<groupId>org.hibernate.validator</groupId>
<artifactId>hibernate-validator</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.confluent</groupId>
+ <artifactId>kafka-streams-avro-serde</artifactId>
+ <version>${confluent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
</images>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>schema</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
+ <outputDirectory>${project.basedir}/target/generated-sources</outputDirectory>
+ <fieldVisibility>PRIVATE</fieldVisibility>
+ <stringType>String</stringType>
+ <includes>
+ <include>*.avsc</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
+ <repositories>
+ <repository>
+ <id>confluent</id>
+ <url>https://packages.confluent.io/maven/</url>
+ </repository>
+ </repositories>
+
</project>
+++ /dev/null
-package de.juplo.kafka.wordcount.users;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@Getter
-@Setter
-@ToString
-@EqualsAndHashCode(of = "username")
-public class User
-{
- public enum Sex { FEMALE, MALE, OTHER }
-
- @NotEmpty
- private String username;
-
- private String firstName;
- private String lastName;
- private Sex sex;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import javax.validation.constraints.NotEmpty;
+
+
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(of = "username")
+public class UserRequest
+{
+ public enum Sex { FEMALE, MALE, OTHER }
+
+ @NotEmpty
+ private String username;
+
+ private String firstName;
+ private String lastName;
+ private Sex sex;
+}
package de.juplo.kafka.wordcount.users;
+import de.juplo.kafka.wordcount.avro.User;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
public class UsersApplication
{
@Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(UsersApplicationProperties properties)
+ KafkaProducer<String, User> producer(UsersApplicationProperties properties)
{
Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
+ props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
return new KafkaProducer<>(props);
}
public class UsersApplicationProperties
{
private String bootstrapServer = "localhost:9092";
+ private String schemaRegistry = "https://schema-registry:9081/";
private String topic = "users";
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.http.HttpStatus;
{
private final String topic;
private final ObjectMapper mapper;
- private final KafkaProducer<String, String> producer;
+ private final KafkaProducer<String, User> producer;
public UsersController(
UsersApplicationProperties properties,
ObjectMapper mapper,
- KafkaProducer<String,String> producer)
+ KafkaProducer<String,User> producer)
{
this.topic = properties.getTopic();
this.mapper = mapper;
MimeTypeUtils.APPLICATION_JSON_VALUE
},
produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
- DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+ DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody UserRequest userRequest) throws JsonProcessingException
{
DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
- String value = mapper.writeValueAsString(user);
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+ ProducerRecord<String, User> record =
+ new ProducerRecord<>(
+ topic,
+ userRequest.getUsername(),
+ User
+ .newBuilder()
+ .setUsername(userRequest.getUsername())
+ .setFirstName(userRequest.getFirstName())
+ .setLastName(userRequest.getLastName())
+ .build());
producer.send(record, (metadata, exception) ->
{
if (metadata != null)
{
result.setResult(
ResponseEntity.ok(UsersResult.of(
- user.getUsername(),
- user,
+ userRequest.getUsername(),
+ userRequest,
topic,
metadata.partition(),
metadata.offset(),
ResponseEntity
.internalServerError()
.body(UsersResult.of(
- user.getUsername(),
- user,
+ userRequest.getUsername(),
+ userRequest,
topic,
null,
null,
public class UsersResult
{
@JsonInclude(NON_NULL) private final String username;
- @JsonInclude(NON_NULL) private final User user;
+ @JsonInclude(NON_NULL) private final UserRequest user;
@JsonInclude(NON_NULL) private final String topic;
@JsonInclude(NON_NULL) private final Integer partition;
@JsonInclude(NON_NULL) private final Long offset;
--- /dev/null
+{
+ "type": "record",
+ "namespace": "de.juplo.kafka.wordcount.avro",
+ "name": "Ranking",
+ "fields": [
+ {
+ "name": "entries",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "Entry",
+ "type": "record",
+ "fields":[
+ { "name": "word",
+ "type": "string"
+ },
+ { "name": "count",
+ "type": "long"
+ }
+ ]
+ }
+ }
+ }
+ ]
+}
--- /dev/null
+{
+ "type": "record",
+ "namespace": "de.juplo.kafka.wordcount.avro",
+ "name": "User",
+ "fields": [
+ {
+ "name": "username", "type": "string"
+ },
+ {
+ "name": "firstName", "type": "string", "default": ""
+ },
+ {
+ "name": "lastName", "type": "string", "default": ""
+ },
+ { "name": "sex", "type":
+ {
+ "type": "enum", "name": "Sex",
+ "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN"
+ },
+ "default": "UNKNOWN"
+ }
+ ]
+}
--- /dev/null
+{
+ "type": "record",
+ "namespace": "de.juplo.kafka.wordcount.avro",
+ "name": "UserRanking",
+ "fields": [
+ {
+ "name": "firstName", "type": "string", "default": ""
+ },
+ {
+ "name": "lastName", "type": "string", "default": ""
+ },
+ {
+ "name": "top10",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "Entry",
+ "type": "record",
+ "fields":[
+ { "name": "word",
+ "type": "string"
+ },
+ { "name": "count",
+ "type": "long"
+ }
+ ]
+ }
+ }
+ }
+ ]
+}