From 36381bd0a4dea9bd10494cb4919de6c7fe5c4190 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 13 Oct 2021 21:12:17 +0200 Subject: [PATCH] WIP --- pom.xml | 41 +++++++++++++++++++ .../users/{User.java => UserRequest.java} | 2 +- .../wordcount/users/UsersApplication.java | 8 +++- .../users/UsersApplicationProperties.java | 1 + .../wordcount/users/UsersController.java | 27 ++++++++---- .../kafka/wordcount/users/UsersResult.java | 2 +- src/main/resources/avro/ranking.avsc | 25 +++++++++++ src/main/resources/avro/user.avsc | 23 +++++++++++ src/main/resources/avro/userranking.avsc | 31 ++++++++++++++ 9 files changed, 147 insertions(+), 13 deletions(-) rename src/main/java/de/juplo/kafka/wordcount/users/{User.java => UserRequest.java} (94%) create mode 100644 src/main/resources/avro/ranking.avsc create mode 100644 src/main/resources/avro/user.avsc create mode 100644 src/main/resources/avro/userranking.avsc diff --git a/pom.xml b/pom.xml index 5790169..d1517da 100644 --- a/pom.xml +++ b/pom.xml @@ -14,9 +14,11 @@ Wordcount-Users Users-service of the multi-user wordcount-example + 1.10.2 0.33.0 11 2.8.0 + 6.2.1 @@ -35,6 +37,16 @@ org.hibernate.validator hibernate-validator + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + org.apache.avro + avro + ${avro.version} + org.springframework.boot @@ -85,7 +97,36 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/target/generated-sources + PRIVATE + String + + *.avsc + + + + + + + + confluent + https://packages.confluent.io/maven/ + + + diff --git a/src/main/java/de/juplo/kafka/wordcount/users/User.java b/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java similarity index 94% rename from src/main/java/de/juplo/kafka/wordcount/users/User.java rename to src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java index 04c9e0c..f0f8cc8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/User.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java @@ -12,7 +12,7 @@ import javax.validation.constraints.NotEmpty; @Setter @ToString @EqualsAndHashCode(of = "username") -public class User +public class UserRequest { public enum Sex { FEMALE, MALE, OTHER } diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java index 9ae44e1..98aa164 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -1,5 +1,8 @@ package de.juplo.kafka.wordcount.users; +import de.juplo.kafka.wordcount.avro.User; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -17,14 +20,15 @@ import java.util.Properties; public class UsersApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(UsersApplicationProperties properties) + KafkaProducer producer(UsersApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java index 8218f99..b10f642 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java @@ -14,5 +14,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; public class UsersApplicationProperties { private String bootstrapServer = "localhost:9092"; + private String schemaRegistry = "https://schema-registry:9081/"; private String topic = "users"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java index 4e64524..84aaf37 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.users; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.avro.User; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.http.HttpStatus; @@ -18,13 +19,13 @@ public class UsersController { private final String topic; private final ObjectMapper mapper; - private final KafkaProducer producer; + private final KafkaProducer producer; public UsersController( UsersApplicationProperties properties, ObjectMapper mapper, - KafkaProducer producer) + KafkaProducer producer) { this.topic = properties.getTopic(); this.mapper = mapper; @@ -38,20 +39,28 @@ public class UsersController MimeTypeUtils.APPLICATION_JSON_VALUE }, produces = MimeTypeUtils.APPLICATION_JSON_VALUE) - DeferredResult> post(@RequestBody User user) throws JsonProcessingException + DeferredResult> post(@RequestBody UserRequest userRequest) throws JsonProcessingException { DeferredResult> result = new DeferredResult<>(); - String value = mapper.writeValueAsString(user); - ProducerRecord record = new ProducerRecord<>(topic, user.getUsername(), value); + ProducerRecord record = + new ProducerRecord<>( + topic, + userRequest.getUsername(), + User + .newBuilder() + .setUsername(userRequest.getUsername()) + .setFirstName(userRequest.getFirstName()) + .setLastName(userRequest.getLastName()) + .build()); producer.send(record, (metadata, exception) -> { if (metadata != null) { result.setResult( ResponseEntity.ok(UsersResult.of( - user.getUsername(), - user, + userRequest.getUsername(), + userRequest, topic, metadata.partition(), metadata.offset(), @@ -64,8 +73,8 @@ public class UsersController ResponseEntity .internalServerError() .body(UsersResult.of( - user.getUsername(), - user, + userRequest.getUsername(), + userRequest, topic, null, null, diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java index d712039..b63723c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java @@ -10,7 +10,7 @@ import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; public class UsersResult { @JsonInclude(NON_NULL) private final String username; - @JsonInclude(NON_NULL) private final User user; + @JsonInclude(NON_NULL) private final UserRequest user; @JsonInclude(NON_NULL) private final String topic; @JsonInclude(NON_NULL) private final Integer partition; @JsonInclude(NON_NULL) private final Long offset; diff --git a/src/main/resources/avro/ranking.avsc b/src/main/resources/avro/ranking.avsc new file mode 100644 index 0000000..37e0f44 --- /dev/null +++ b/src/main/resources/avro/ranking.avsc @@ -0,0 +1,25 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "Ranking", + "fields": [ + { + "name": "entries", + "type": { + "type": "array", + "items": { + "name": "Entry", + "type": "record", + "fields":[ + { "name": "word", + "type": "string" + }, + { "name": "count", + "type": "long" + } + ] + } + } + } + ] +} diff --git a/src/main/resources/avro/user.avsc b/src/main/resources/avro/user.avsc new file mode 100644 index 0000000..012b876 --- /dev/null +++ b/src/main/resources/avro/user.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "User", + "fields": [ + { + "name": "username", "type": "string" + }, + { + "name": "firstName", "type": "string", "default": "" + }, + { + "name": "lastName", "type": "string", "default": "" + }, + { "name": "sex", "type": + { + "type": "enum", "name": "Sex", + "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN" + }, + "default": "UNKNOWN" + } + ] +} diff --git a/src/main/resources/avro/userranking.avsc b/src/main/resources/avro/userranking.avsc new file mode 100644 index 0000000..de8ec87 --- /dev/null +++ b/src/main/resources/avro/userranking.avsc @@ -0,0 +1,31 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "UserRanking", + "fields": [ + { + "name": "firstName", "type": "string", "default": "" + }, + { + "name": "lastName", "type": "string", "default": "" + }, + { + "name": "top10", + "type": { + "type": "array", + "items": { + "name": "Entry", + "type": "record", + "fields":[ + { "name": "word", + "type": "string" + }, + { "name": "count", + "type": "long" + } + ] + } + } + } + ] +} -- 2.20.1