X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersStreamProcessor.java;h=1c7572e706709bb8f99df70070a57dccb1c6dada;hb=4e2fc001e9d7041b875e4a5e62eff250b1a8ce60;hp=ef39e89076cab6b0ccfaf146c75c1ef56161cd54;hpb=aaf3b3248937ffd1677a1eeb6a575c539b6ec84a;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java index ef39e89..1c7572e 100644 --- a/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersStreamProcessor.java @@ -12,7 +12,6 @@ import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -25,7 +24,6 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j -@Component public class UsersStreamProcessor { public final KafkaStreams streams; @@ -36,17 +34,20 @@ public class UsersStreamProcessor public UsersStreamProcessor( - UsersApplicationProperties properties, + String applicationId, + HostInfo applicationServer, + String bootstrapServer, + String topic, ObjectMapper mapper, ConfigurableApplicationContext context) { StreamsBuilder builder = new StreamsBuilder(); - builder.table(properties.getTopic(), Materialized.as(storeName)); + builder.table(topic, Materialized.as(storeName)); Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); @@ -63,7 +64,7 @@ public class UsersStreamProcessor return SHUTDOWN_CLIENT; }); - hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer()); + hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());; this.mapper = mapper; }