import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
-@Component
public class UsersStreamProcessor
{
public final KafkaStreams streams;
public UsersStreamProcessor(
- UsersApplicationProperties properties,
+ String applicationId,
+ HostInfo applicationServer,
+ String bootstrapServer,
+ String topic,
ObjectMapper mapper,
ConfigurableApplicationContext context)
{
StreamsBuilder builder = new StreamsBuilder();
- builder.table(properties.getTopic(), Materialized.as(storeName));
+ builder.table(topic, Materialized.as(storeName));
Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return SHUTDOWN_CLIENT;
});
- hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer());
+ hostInfo = applicationServer;
storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
this.mapper = mapper;
}