From: Kai Moritz Date: Wed, 13 Oct 2021 19:12:17 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=36381bd0a4dea9bd10494cb4919de6c7fe5c4190;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/pom.xml b/pom.xml index 5790169..d1517da 100644 --- a/pom.xml +++ b/pom.xml @@ -14,9 +14,11 @@ Wordcount-Users Users-service of the multi-user wordcount-example + 1.10.2 0.33.0 11 2.8.0 + 6.2.1 @@ -35,6 +37,16 @@ org.hibernate.validator hibernate-validator + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + org.apache.avro + avro + ${avro.version} + org.springframework.boot @@ -85,7 +97,36 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/target/generated-sources + PRIVATE + String + + *.avsc + + + + + + + + confluent + https://packages.confluent.io/maven/ + + + diff --git a/src/main/java/de/juplo/kafka/wordcount/users/User.java b/src/main/java/de/juplo/kafka/wordcount/users/User.java deleted file mode 100644 index 04c9e0c..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/users/User.java +++ /dev/null @@ -1,25 +0,0 @@ -package de.juplo.kafka.wordcount.users; - -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; - -import javax.validation.constraints.NotEmpty; - - -@Getter -@Setter -@ToString -@EqualsAndHashCode(of = "username") -public class User -{ - public enum Sex { FEMALE, MALE, OTHER } - - @NotEmpty - private String username; - - private String firstName; - private String lastName; - private Sex sex; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java b/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java new file mode 100644 index 0000000..f0f8cc8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.wordcount.users; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import javax.validation.constraints.NotEmpty; + + +@Getter +@Setter +@ToString +@EqualsAndHashCode(of = "username") +public class UserRequest +{ + public enum Sex { FEMALE, MALE, OTHER } + + @NotEmpty + private String username; + + private String firstName; + private String lastName; + private Sex sex; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java index 9ae44e1..98aa164 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -1,5 +1,8 @@ package de.juplo.kafka.wordcount.users; +import de.juplo.kafka.wordcount.avro.User; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -17,14 +20,15 @@ import java.util.Properties; public class UsersApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(UsersApplicationProperties properties) + KafkaProducer producer(UsersApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java index 8218f99..b10f642 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java @@ -14,5 +14,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; public class UsersApplicationProperties { private String bootstrapServer = "localhost:9092"; + private String schemaRegistry = "https://schema-registry:9081/"; private String topic = "users"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java index 4e64524..84aaf37 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.users; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.avro.User; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.http.HttpStatus; @@ -18,13 +19,13 @@ public class UsersController { private final String topic; private final ObjectMapper mapper; - private final KafkaProducer producer; + private final KafkaProducer producer; public UsersController( UsersApplicationProperties properties, ObjectMapper mapper, - KafkaProducer producer) + KafkaProducer producer) { this.topic = properties.getTopic(); this.mapper = mapper; @@ -38,20 +39,28 @@ public class UsersController MimeTypeUtils.APPLICATION_JSON_VALUE }, produces = MimeTypeUtils.APPLICATION_JSON_VALUE) - DeferredResult> post(@RequestBody User user) throws JsonProcessingException + DeferredResult> post(@RequestBody UserRequest userRequest) throws JsonProcessingException { DeferredResult> result = new DeferredResult<>(); - String value = mapper.writeValueAsString(user); - ProducerRecord record = new ProducerRecord<>(topic, user.getUsername(), value); + ProducerRecord record = + new ProducerRecord<>( + topic, + userRequest.getUsername(), + User + .newBuilder() + .setUsername(userRequest.getUsername()) + .setFirstName(userRequest.getFirstName()) + .setLastName(userRequest.getLastName()) + .build()); producer.send(record, (metadata, exception) -> { if (metadata != null) { result.setResult( ResponseEntity.ok(UsersResult.of( - user.getUsername(), - user, + userRequest.getUsername(), + userRequest, topic, metadata.partition(), metadata.offset(), @@ -64,8 +73,8 @@ public class UsersController ResponseEntity .internalServerError() .body(UsersResult.of( - user.getUsername(), - user, + userRequest.getUsername(), + userRequest, topic, null, null, diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java index d712039..b63723c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java @@ -10,7 +10,7 @@ import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; public class UsersResult { @JsonInclude(NON_NULL) private final String username; - @JsonInclude(NON_NULL) private final User user; + @JsonInclude(NON_NULL) private final UserRequest user; @JsonInclude(NON_NULL) private final String topic; @JsonInclude(NON_NULL) private final Integer partition; @JsonInclude(NON_NULL) private final Long offset; diff --git a/src/main/resources/avro/ranking.avsc b/src/main/resources/avro/ranking.avsc new file mode 100644 index 0000000..37e0f44 --- /dev/null +++ b/src/main/resources/avro/ranking.avsc @@ -0,0 +1,25 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "Ranking", + "fields": [ + { + "name": "entries", + "type": { + "type": "array", + "items": { + "name": "Entry", + "type": "record", + "fields":[ + { "name": "word", + "type": "string" + }, + { "name": "count", + "type": "long" + } + ] + } + } + } + ] +} diff --git a/src/main/resources/avro/user.avsc b/src/main/resources/avro/user.avsc new file mode 100644 index 0000000..012b876 --- /dev/null +++ b/src/main/resources/avro/user.avsc @@ -0,0 +1,23 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "User", + "fields": [ + { + "name": "username", "type": "string" + }, + { + "name": "firstName", "type": "string", "default": "" + }, + { + "name": "lastName", "type": "string", "default": "" + }, + { "name": "sex", "type": + { + "type": "enum", "name": "Sex", + "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN" + }, + "default": "UNKNOWN" + } + ] +} diff --git a/src/main/resources/avro/userranking.avsc b/src/main/resources/avro/userranking.avsc new file mode 100644 index 0000000..de8ec87 --- /dev/null +++ b/src/main/resources/avro/userranking.avsc @@ -0,0 +1,31 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "UserRanking", + "fields": [ + { + "name": "firstName", "type": "string", "default": "" + }, + { + "name": "lastName", "type": "string", "default": "" + }, + { + "name": "top10", + "type": { + "type": "array", + "items": { + "name": "Entry", + "type": "record", + "fields":[ + { "name": "word", + "type": "string" + }, + { "name": "count", + "type": "long" + } + ] + } + } + } + ] +}