WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
index e8d7c11..af89200 100644 (file)
@@ -2,6 +2,9 @@ package de.juplo.kafka.wordcount.counter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.Key;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
@@ -9,7 +12,9 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
 import org.springframework.boot.SpringApplication;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.stereotype.Component;
@@ -40,32 +45,28 @@ public class CounterStreamProcessor
        {
                StreamsBuilder builder = new StreamsBuilder();
 
-               KStream<String, String> source = builder.stream(properties.getInputTopic());
+               KStream<String, String> source =
+                               builder.stream(
+                                               properties.getInputTopic(),
+                                               Consumed.with(Serdes.String(), Serdes.String()));
+
                source
                                .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence)))
                                .map((username, word) ->
-                               {
-                                       try
-                                       {
-                                               String key = mapper.writeValueAsString(Key.of(username, word));
-                                               return new KeyValue<>(key, word);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               })
+                                               new KeyValue<>(
+                                                               Key.newBuilder().setUsername(username).setWord(word).build(),
+                                                               word))
                                .groupByKey()
                                .count()
-                               .mapValues(value->Long.toString(value))
                                .toStream()
                                .to(properties.getOutputTopic());
 
                Properties props = new Properties();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());
+               props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
                streams = new KafkaStreams(builder.build(), props);