From 8681e52b8be6c2970bb229888da6e92260bcef47 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 09:28:51 +0200 Subject: [PATCH] query: 1.0.6 - Separated config in `QueryApplicationConfiguration` -- COPY --- .../query/QueryApplicationConfiguration.java | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java new file mode 100644 index 0000000..813d3b2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -0,0 +1,58 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.streams.state.HostInfo; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; + + +@SpringBootApplication +@EnableConfigurationProperties(QueryApplicationProperties.class) +public class QueryApplication +{ + @Bean + public QueryStreamProcessor usersStreamProcessor( + ServerProperties serverProperties, + QueryApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new QueryStreamProcessor( + properties.getApplicationId(), + new HostInfo(host, port), + properties.getBootstrapServer(), + properties.getUsersInputTopic(), + properties.getRankingInputTopic(), + mapper, + context); + } + + + public static void main(String[] args) + { + SpringApplication.run(QueryApplication.class, args); + } +} -- 2.20.1