From: Kai Moritz Date: Wed, 13 Oct 2021 19:15:39 +0000 (+0200) Subject: WIP X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=aa7df3559b23def3cef91d9831499fc2deab2e36;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java index 19265f1..86a2925 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -14,6 +14,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; public class QueryApplicationProperties { private String bootstrapServer = "localhost:9092"; + private String schemaRegistry = "https://schema-registry:9081/"; private String applicationId = "query"; private String rankingInputTopic = "top10"; private String usersInputTopic = "users"; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index b52e617..1c5c593 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -67,7 +67,7 @@ public class QueryStreamProcessor props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); - props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "https://schema-registry:9081/"); + props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, properties.getSchemaRegistry()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streams = new KafkaStreams(builder.build(), props);