From: Kai Moritz Date: Wed, 13 Oct 2021 16:46:03 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9fbfb08601d503a7d21ab5f051f946e439385d58;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/pom.xml b/pom.xml index 71115c0..f8b96bb 100644 --- a/pom.xml +++ b/pom.xml @@ -14,9 +14,11 @@ Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example + 1.10.2 0.33.0 11 2.8.0 + 6.2.1 @@ -31,6 +33,16 @@ org.apache.kafka kafka-streams + + io.confluent + kafka-streams-avro-serde + ${confluent.version} + + + org.apache.avro + avro + ${avro.version} + org.springframework.boot @@ -81,7 +93,36 @@ + + org.apache.avro + avro-maven-plugin + ${avro.version} + + + generate-sources + + schema + + + ${project.basedir}/src/main/resources/avro + ${project.basedir}/target/generated-sources + PRIVATE + String + + *.avsc + + + + + + + + confluent + https://packages.confluent.io/maven/ + + + diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java index d670ba2..5fd7153 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java @@ -14,6 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; public class CounterApplicationProperties { private String bootstrapServer = "localhost:9092"; + private String schemaRegistry = "https://schema-registry:9081/"; private String applicationId = "counter"; private String inputTopic = "recordings"; private String outputTopic = "countings"; diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index e8d7c11..af89200 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -2,6 +2,9 @@ package de.juplo.kafka.wordcount.counter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.avro.Key; +import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; +import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -9,7 +12,9 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; @@ -40,32 +45,28 @@ public class CounterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(properties.getInputTopic()); + KStream source = + builder.stream( + properties.getInputTopic(), + Consumed.with(Serdes.String(), Serdes.String())); + source .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) .map((username, word) -> - { - try - { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + new KeyValue<>( + Key.newBuilder().setUsername(username).setWord(word).build(), + word)) .groupByKey() .count() - .mapValues(value->Long.toString(value)) .toStream() .to(properties.getOutputTopic()); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName()); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streams = new KafkaStreams(builder.build(), props); diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java deleted file mode 100644 index 1e00dca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Key -{ - private final String username; - private final String word; -} diff --git a/src/main/resources/avro/key.avsc b/src/main/resources/avro/key.avsc new file mode 100644 index 0000000..6e2467e --- /dev/null +++ b/src/main/resources/avro/key.avsc @@ -0,0 +1,13 @@ +{ + "type": "record", + "namespace": "de.juplo.kafka.wordcount.avro", + "name": "Key", + "fields": [ + { + "name": "username", "type": "string" + }, + { + "name": "word", "type": "string" + } + ] +}