WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 19:12:17 +0000 (21:12 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 19:12:17 +0000 (21:12 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/users/User.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java
src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/users/UsersController.java
src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java
src/main/resources/avro/ranking.avsc [new file with mode: 0644]
src/main/resources/avro/user.avsc [new file with mode: 0644]
src/main/resources/avro/userranking.avsc [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 5790169..d1517da 100644 (file)
--- a/pom.xml
+++ b/pom.xml
        <name>Wordcount-Users</name>
        <description>Users-service of the multi-user wordcount-example</description>
        <properties>
+               <avro.version>1.10.2</avro.version>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
                <java.version>11</java.version>
                <kafka.version>2.8.0</kafka.version>
+               <confluent.version>6.2.1</confluent.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.hibernate.validator</groupId>
                        <artifactId>hibernate-validator</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-streams-avro-serde</artifactId>
+                       <version>${confluent.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <version>${avro.version}</version>
+               </dependency>
 
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                                        </images>
                                </configuration>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
+                                                       <outputDirectory>${project.basedir}/target/generated-sources</outputDirectory>
+                                                       <fieldVisibility>PRIVATE</fieldVisibility>
+                                                       <stringType>String</stringType>
+                                                       <includes>
+                                                               <include>*.avsc</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>https://packages.confluent.io/maven/</url>
+               </repository>
+       </repositories>
+
 </project>
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/User.java b/src/main/java/de/juplo/kafka/wordcount/users/User.java
deleted file mode 100644 (file)
index 04c9e0c..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-package de.juplo.kafka.wordcount.users;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@Getter
-@Setter
-@ToString
-@EqualsAndHashCode(of = "username")
-public class User
-{
-  public enum Sex { FEMALE, MALE, OTHER }
-
-  @NotEmpty
-  private String username;
-
-  private String firstName;
-  private String lastName;
-  private Sex sex;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java b/src/main/java/de/juplo/kafka/wordcount/users/UserRequest.java
new file mode 100644 (file)
index 0000000..f0f8cc8
--- /dev/null
@@ -0,0 +1,25 @@
+package de.juplo.kafka.wordcount.users;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import javax.validation.constraints.NotEmpty;
+
+
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(of = "username")
+public class UserRequest
+{
+  public enum Sex { FEMALE, MALE, OTHER }
+
+  @NotEmpty
+  private String username;
+
+  private String firstName;
+  private String lastName;
+  private Sex sex;
+}
index 9ae44e1..98aa164 100644 (file)
@@ -1,5 +1,8 @@
 package de.juplo.kafka.wordcount.users;
 
+import de.juplo.kafka.wordcount.avro.User;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -17,14 +20,15 @@ import java.util.Properties;
 public class UsersApplication
 {
        @Bean(destroyMethod = "close")
-       KafkaProducer<String, String> producer(UsersApplicationProperties properties)
+       KafkaProducer<String, User> producer(UsersApplicationProperties properties)
        {
                Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
 
                Properties props = new Properties();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SpecificAvroSerializer.class);
+               props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
 
                return new KafkaProducer<>(props);
        }
index 8218f99..b10f642 100644 (file)
@@ -14,5 +14,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class UsersApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
+  private String schemaRegistry = "https://schema-registry:9081/";
   private String topic = "users";
 }
index 4e64524..84aaf37 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.users;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.User;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.http.HttpStatus;
@@ -18,13 +19,13 @@ public class UsersController
 {
   private final String topic;
   private final ObjectMapper mapper;
-  private final KafkaProducer<String, String> producer;
+  private final KafkaProducer<String, User> producer;
 
 
   public UsersController(
       UsersApplicationProperties properties,
       ObjectMapper mapper,
-      KafkaProducer<String,String> producer)
+      KafkaProducer<String,User> producer)
   {
     this.topic = properties.getTopic();
     this.mapper = mapper;
@@ -38,20 +39,28 @@ public class UsersController
           MimeTypeUtils.APPLICATION_JSON_VALUE
       },
       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
-  DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+  DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody UserRequest userRequest) throws JsonProcessingException
   {
     DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
 
-    String value = mapper.writeValueAsString(user);
-    ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+    ProducerRecord<String, User> record =
+        new ProducerRecord<>(
+            topic,
+            userRequest.getUsername(),
+            User
+                .newBuilder()
+                .setUsername(userRequest.getUsername())
+                .setFirstName(userRequest.getFirstName())
+                .setLastName(userRequest.getLastName())
+                .build());
     producer.send(record, (metadata, exception) ->
     {
       if (metadata != null)
       {
         result.setResult(
             ResponseEntity.ok(UsersResult.of(
-                user.getUsername(),
-                user,
+                userRequest.getUsername(),
+                userRequest,
                 topic,
                 metadata.partition(),
                 metadata.offset(),
@@ -64,8 +73,8 @@ public class UsersController
             ResponseEntity
                 .internalServerError()
                 .body(UsersResult.of(
-                    user.getUsername(),
-                    user,
+                    userRequest.getUsername(),
+                    userRequest,
                     topic,
                     null,
                     null,
index d712039..b63723c 100644 (file)
@@ -10,7 +10,7 @@ import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
 public class UsersResult
 {
   @JsonInclude(NON_NULL) private final String username;
-  @JsonInclude(NON_NULL) private final User user;
+  @JsonInclude(NON_NULL) private final UserRequest user;
   @JsonInclude(NON_NULL) private final String topic;
   @JsonInclude(NON_NULL) private final Integer partition;
   @JsonInclude(NON_NULL) private final Long offset;
diff --git a/src/main/resources/avro/ranking.avsc b/src/main/resources/avro/ranking.avsc
new file mode 100644 (file)
index 0000000..37e0f44
--- /dev/null
@@ -0,0 +1,25 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "Ranking",
+    "fields": [
+        {
+            "name": "entries",
+            "type": {
+                "type": "array",
+                "items": {
+                    "name": "Entry",
+                    "type": "record",
+                    "fields":[
+                        {   "name": "word",
+                            "type": "string"
+                        },
+                        {   "name": "count",
+                            "type": "long"
+                        }
+                    ]
+                }
+            }
+        }
+    ]
+}
diff --git a/src/main/resources/avro/user.avsc b/src/main/resources/avro/user.avsc
new file mode 100644 (file)
index 0000000..012b876
--- /dev/null
@@ -0,0 +1,23 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "User",
+    "fields": [
+        {
+            "name": "username", "type": "string"
+        },
+        {
+            "name": "firstName", "type": "string", "default": ""
+        },
+        {
+            "name": "lastName", "type": "string", "default": ""
+        },
+        { "name": "sex",   "type":
+            {
+                "type": "enum", "name": "Sex",
+                "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN"
+            },
+            "default": "UNKNOWN"
+        }
+    ]
+}
diff --git a/src/main/resources/avro/userranking.avsc b/src/main/resources/avro/userranking.avsc
new file mode 100644 (file)
index 0000000..de8ec87
--- /dev/null
@@ -0,0 +1,31 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "UserRanking",
+    "fields": [
+        {
+            "name": "firstName", "type": "string", "default": ""
+        },
+        {
+            "name": "lastName", "type": "string", "default": ""
+        },
+        {
+            "name": "top10",
+            "type": {
+                "type": "array",
+                "items": {
+                    "name": "Entry",
+                    "type": "record",
+                    "fields":[
+                        {   "name": "word",
+                            "type": "string"
+                        },
+                        {   "name": "count",
+                            "type": "long"
+                        }
+                    ]
+                }
+            }
+        }
+    ]
+}