WIP
authorKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 16:46:03 +0000 (18:46 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 13 Oct 2021 16:52:52 +0000 (18:52 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/counter/Key.java [deleted file]
src/main/resources/avro/key.avsc [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 71115c0..f8b96bb 100644 (file)
--- a/pom.xml
+++ b/pom.xml
        <name>Wordcount-Counter</name>
        <description>Word-counting stream-processor of the multi-user wordcount-example</description>
        <properties>
+               <avro.version>1.10.2</avro.version>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
                <java.version>11</java.version>
                <kafka.version>2.8.0</kafka.version>
+               <confluent.version>6.2.1</confluent.version>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-streams</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>io.confluent</groupId>
+                       <artifactId>kafka-streams-avro-serde</artifactId>
+                       <version>${confluent.version}</version>
+               </dependency>
+               <dependency>
+                       <groupId>org.apache.avro</groupId>
+                       <artifactId>avro</artifactId>
+                       <version>${avro.version}</version>
+               </dependency>
 
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                                        </images>
                                </configuration>
                        </plugin>
+                       <plugin>
+                               <groupId>org.apache.avro</groupId>
+                               <artifactId>avro-maven-plugin</artifactId>
+                               <version>${avro.version}</version>
+                               <executions>
+                                       <execution>
+                                               <phase>generate-sources</phase>
+                                               <goals>
+                                                       <goal>schema</goal>
+                                               </goals>
+                                               <configuration>
+                                                       <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
+                                                       <outputDirectory>${project.basedir}/target/generated-sources</outputDirectory>
+                                                       <fieldVisibility>PRIVATE</fieldVisibility>
+                                                       <stringType>String</stringType>
+                                                       <includes>
+                                                               <include>*.avsc</include>
+                                                       </includes>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
                </plugins>
        </build>
 
+       <repositories>
+               <repository>
+                       <id>confluent</id>
+                       <url>https://packages.confluent.io/maven/</url>
+               </repository>
+       </repositories>
+
 </project>
index d670ba2..5fd7153 100644 (file)
@@ -14,6 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 public class CounterApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
+  private String schemaRegistry = "https://schema-registry:9081/";
   private String applicationId = "counter";
   private String inputTopic = "recordings";
   private String outputTopic = "countings";
index e8d7c11..af89200 100644 (file)
@@ -2,6 +2,9 @@ package de.juplo.kafka.wordcount.counter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Key;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
@@ -9,7 +12,9 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
 import org.springframework.boot.SpringApplication;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.stereotype.Component;
@@ -40,32 +45,28 @@ public class CounterStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KStream<String, String> source = builder.stream(properties.getInputTopic());
+               KStream<String, String> source =
+                               builder.stream(
+                                               properties.getInputTopic(),
+                                               Consumed.with(Serdes.String(), Serdes.String()));
+
                source
                                .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence)))
                                .map((username, word) ->
-                               {
-                                       try
-                                       {
-                                               String key = mapper.writeValueAsString(Key.of(username, word));
-                                               return new KeyValue<>(key, word);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               })
+                                               new KeyValue<>(
+                                                               Key.newBuilder().setUsername(username).setWord(word).build(),
+                                                               word))
                                .groupByKey()
                                .count()
-                               .mapValues(value->Long.toString(value))
                                .toStream()
                                .to(properties.getOutputTopic());
 
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+               props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
                streams = new KafkaStreams(builder.build(), props);
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java
deleted file mode 100644 (file)
index 1e00dca..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.Value;
-
-
-@Value(staticConstructor = "of")
-public class Key
-{
-  private final String username;
-  private final String word;
-}
diff --git a/src/main/resources/avro/key.avsc b/src/main/resources/avro/key.avsc
new file mode 100644 (file)
index 0000000..6e2467e
--- /dev/null
@@ -0,0 +1,13 @@
+{
+    "type": "record",
+    "namespace": "de.juplo.kafka.wordcount.avro",
+    "name": "Key",
+    "fields": [
+        {
+            "name": "username", "type": "string"
+        },
+        {
+            "name": "word", "type": "string"
+        }
+    ]
+}