package de.juplo.kafka.wordcount.query;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.streams.state.HostInfo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.autoconfigure.web.ServerProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
@SpringBootApplication
-@EnableConfigurationProperties(QueryApplicationProperties.class)
public class QueryApplication
{
- @Bean
- public QueryStreamProcessor usersStreamProcessor(
- ServerProperties serverProperties,
- QueryApplicationProperties properties,
- ObjectMapper mapper,
- ConfigurableApplicationContext context) throws IOException
- {
- String host;
- if (serverProperties.getAddress() == null)
- {
- HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
- Socket socket = new Socket();
- socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
- host = socket.getLocalAddress().getHostAddress();
- }
- else
- {
- host = serverProperties.getAddress().getHostAddress();
- }
-
- Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
-
- return new QueryStreamProcessor(
- properties.getApplicationId(),
- new HostInfo(host, port),
- properties.getBootstrapServer(),
- properties.getUsersInputTopic(),
- properties.getRankingInputTopic(),
- mapper,
- context);
- }
-
-
public static void main(String[] args)
{
SpringApplication.run(QueryApplication.class, args);
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.streams.state.HostInfo;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
-@SpringBootApplication
+@Configuration
@EnableConfigurationProperties(QueryApplicationProperties.class)
-public class QueryApplication
+public class QueryApplicationConfiguration
{
@Bean
- public QueryStreamProcessor usersStreamProcessor(
+ public QueryStreamProcessor streamProcessor(
ServerProperties serverProperties,
QueryApplicationProperties properties,
ObjectMapper mapper,
mapper,
context);
}
-
-
- public static void main(String[] args)
- {
- SpringApplication.run(QueryApplication.class, args);
- }
}