WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 20:12:56 +0000 (22:12 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 20:12:56 +0000 (22:12 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/main/resources/avro/key.avsc [new file with mode: 0644]
src/main/resources/avro/ranking.avsc [new file with mode: 0644]
src/main/resources/avro/user.avsc [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 9bda638..e4ef604 100644 (file)
--- a/pom.xml
+++ b/pom.xml
        <name>Wordcount-Top-10</name>
        <description>Top-10 stream-processor of the multi-user wordcount-example</description>
        <properties>
+               <avro.version>1.10.2</avro.version>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
                <java.version>11</java.version>
                <kafka.version>2.8.0</kafka.version>
+               <confluent.version>6.2.1</confluent.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-streams</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-streams-avro-serde</artifactId>
+                       <version>${confluent.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <version>${avro.version}</version>
+               </dependency>
 
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                                        </images>
                                </configuration>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
+                                                       <outputDirectory>${project.basedir}/target/generated-sources</outputDirectory>
+                                                       <fieldVisibility>PRIVATE</fieldVisibility>
+                                                       <stringType>String</stringType>
+                                                       <includes>
+                                                               <include>*.avsc</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>https://packages.confluent.io/maven/</url>
+               </repository>
+       </repositories>
+
 </project>
index 93b78ec..9eca7aa 100644 (file)
@@ -14,6 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class Top10ApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
+  private String schemaRegistry = "https://schema-registry:9081/";
   private String applicationId = "top10";
   private String inputTopic = "countings";
   private String outputTopic = "top10";
index 862913a..145578f 100644 (file)
@@ -1,7 +1,11 @@
 package de.juplo.kafka.wordcount.top10;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Entry;
+import de.juplo.kafka.wordcount.avro.Key;
+import de.juplo.kafka.wordcount.avro.Ranking;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
@@ -39,47 +43,26 @@ public class Top10StreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<String, String>stream(properties.getInputTopic())
-                               .map((keyJson, countStr) ->
-                               {
-                                       try
-                                       {
-                                               Key key = mapper.readValue(keyJson, Key.class);
-                                               Long count = Long.parseLong(countStr);
-                                               Entry entry = Entry.of(key.getWord(), count);
-                                               String entryJson = mapper.writeValueAsString(entry);
-                                               return new KeyValue<>(key.getUsername(), entryJson);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               })
+                               .<Key, Long>stream(properties.getInputTopic())
+                               .map((key, count) -> new KeyValue<>(
+                                               key.getUsername(),
+                                               Entry.newBuilder().setWord(key.getWord()).setCount(count).build()))
                                .groupByKey()
                                .aggregate(
-                                               () -> "{\"entries\"     : []}",
-                                               (username, entryJson, rankingJson) ->
-                                               {
-                                                       try
-                                                       {
-                                                               Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
-                                                               ranking.add(mapper.readValue(entryJson, Entry.class));
-                                                               return mapper.writeValueAsString(ranking);
-                                                       }
-                                                       catch (JsonProcessingException e)
-                                                       {
-                                                               throw new RuntimeException(e);
-                                                       }
-                                               }
-                               )
+                                               () -> Ranking.newBuilder().build(),
+                                               (username, entry, ranking) -> {
+                                                       ranking.getEntries().add(entry);
+                                                       return ranking;
+                                               })
                                .toStream()
                                .to(properties.getOutputTopic());
 
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+               props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
                streams = new KafkaStreams(builder.build(), props);
diff --git a/src/main/resources/avro/key.avsc b/src/main/resources/avro/key.avsc
new file mode 100644 (file)
index 0000000..6e2467e
--- /dev/null
@@ -0,0 +1,13 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "Key",
+    "fields": [
+        {
+            "name": "username", "type": "string"
+        },
+        {
+            "name": "word", "type": "string"
+        }
+    ]
+}
diff --git a/src/main/resources/avro/ranking.avsc b/src/main/resources/avro/ranking.avsc
new file mode 100644 (file)
index 0000000..37e0f44
--- /dev/null
@@ -0,0 +1,25 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "Ranking",
+    "fields": [
+        {
+            "name": "entries",
+            "type": {
+                "type": "array",
+                "items": {
+                    "name": "Entry",
+                    "type": "record",
+                    "fields":[
+                        {   "name": "word",
+                            "type": "string"
+                        },
+                        {   "name": "count",
+                            "type": "long"
+                        }
+                    ]
+                }
+            }
+        }
+    ]
+}
diff --git a/src/main/resources/avro/user.avsc b/src/main/resources/avro/user.avsc
new file mode 100644 (file)
index 0000000..012b876
--- /dev/null
@@ -0,0 +1,23 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "User",
+    "fields": [
+        {
+            "name": "username", "type": "string"
+        },
+        {
+            "name": "firstName", "type": "string", "default": ""
+        },
+        {
+            "name": "lastName", "type": "string", "default": ""
+        },
+        { "name": "sex",   "type":
+            {
+                "type": "enum", "name": "Sex",
+                "symbols": [ "UNKNOWN", "FEMALE", "MALE", "OTHER" ], "default": "UNKNOWN"
+            },
+            "default": "UNKNOWN"
+        }
+    ]
+}