</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>users</artifactId>
- <version>1.0.2</version>
+ <version>1.0.3</version>
<name>Wordcount-Users</name>
<description>Users-service of the multi-user wordcount-example</description>
<properties>
package de.juplo.kafka.wordcount.users;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.state.HostInfo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.Assert;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
import java.util.Properties;
return new KafkaProducer<>(props);
}
+ @Bean
+ public UsersStreamProcessor usersStreamProcessor(
+ ServerProperties serverProperties,
+ UsersApplicationProperties properties,
+ ObjectMapper mapper,
+ ConfigurableApplicationContext context) throws IOException
+ {
+ String host;
+ if (serverProperties.getAddress() == null)
+ {
+ HostInfo bootstrapServer = HostInfo.buildFromEndpoint(properties.getBootstrapServer());
+ Socket socket = new Socket();
+ socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+ host = socket.getLocalAddress().getHostAddress();
+ }
+ else
+ {
+ host = serverProperties.getAddress().getHostAddress();
+ }
+
+ Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+ return new UsersStreamProcessor(
+ properties.getApplicationId(),
+ new HostInfo(host, port),
+ properties.getBootstrapServer(),
+ properties.getTopic(),
+ mapper,
+ context);
+ }
+
+
public static void main(String[] args)
{
SpringApplication.run(UsersApplication.class, args);
private String bootstrapServer = "localhost:9092";
private String applicationId = "users";
private String topic = "users";
- private String applicationServer = "localhost:8080";
}
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
-@Component
public class UsersStreamProcessor
{
public final KafkaStreams streams;
public UsersStreamProcessor(
- UsersApplicationProperties properties,
+ String applicationId,
+ HostInfo applicationServer,
+ String bootstrapServer,
+ String topic,
ObjectMapper mapper,
ConfigurableApplicationContext context)
{
StreamsBuilder builder = new StreamsBuilder();
- builder.table(properties.getTopic(), Materialized.as(storeName));
+ builder.table(topic, Materialized.as(storeName));
Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return SHUTDOWN_CLIENT;
});
- hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer());
+ hostInfo = applicationServer;
storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
this.mapper = mapper;
}