From: Kai Moritz Date: Thu, 14 Oct 2021 16:20:18 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5edc5e44656c9ec5b8cc75ad22dec6ddf1a2a7c4;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index d30e5d7..aa21c4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -15,6 +15,8 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; @@ -45,7 +47,7 @@ public class Top10StreamProcessor StreamsBuilder builder = new StreamsBuilder(); builder - .stream(properties.getInputTopic(), Consumed.with(SpecificAvroSerializer.class, Serdes.String()) ) + .stream(properties.getInputTopic(), Consumed.with(null, Serdes.Long())) .map((key, count) -> new KeyValue<>( key.getUsername(), Entry.newBuilder().setWord(key.getWord()).setCount(count).build())) @@ -55,14 +57,15 @@ public class Top10StreamProcessor (username, entry, ranking) -> { ranking.getEntries().add(entry); return ranking; - }) + }, + Materialized.with(Serdes.String(), null)) .toStream() - .to(properties.getOutputTopic()); + .to(properties.getOutputTopic(), Produced.keySerde(Serdes.String())); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");