+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-
-
-@Data
-public class Entry
-{
- private String key;
- private Long counter;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-
-
-@Data
-public class Key
-{
- private String type;
- private String channel;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class QueryApplication
-{
- public static void main(String[] args)
- {
- SpringApplication.run(QueryApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.web.ServerProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(QueryApplicationProperties.class)
-@Slf4j
-public class QueryApplicationConfiguration
-{
- @Bean
- public HostInfo applicationServer(
- ServerProperties serverProperties,
- QueryApplicationProperties applicationProperties) throws IOException
- {
- String host;
- if (serverProperties.getAddress() == null)
- {
- HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
- Socket socket = new Socket();
- socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
- host = socket.getLocalAddress().getHostAddress();
- }
- else
- {
- host = serverProperties.getAddress().getHostAddress();
- }
-
- Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
-
- return new HostInfo(host, port);
- }
-
- @Bean
- public Properties streamProcessorProperties(
- QueryApplicationProperties applicationProperties,
- HostInfo applicationServer)
- {
- Properties props = new Properties();
-
- props.putAll(serializationConfig());
-
- String applicationId = applicationProperties.getApplicationId();
- String bootstrapServer = applicationProperties.getBootstrapServer();
-
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
- props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
-
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
- if (applicationProperties.getCommitInterval() != null)
- props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval());
- if (applicationProperties.getCacheMaxBytes() != null)
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes());
-
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- return props;
- }
-
- static Properties serializationConfig()
- {
- Properties props = new Properties();
-
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(
- JsonDeserializer.TYPE_MAPPINGS,
- "stats:" + Key.class.getName() + "," +
- "ranking:" + Ranking.class.getName() + "," +
- "userranking:" + UserRanking.class.getName());
-
- return props;
- }
-
- @Bean(initMethod = "start", destroyMethod = "stop")
- public QueryStreamProcessor streamProcessor(
- Properties streamProcessorProperties,
- HostInfo applicationServer,
- QueryApplicationProperties applicationProperties,
- KeyValueBytesStoreSupplier userStoreSupplier,
- KeyValueBytesStoreSupplier rankingStoreSupplier,
- ConfigurableApplicationContext context)
- {
- QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
- streamProcessorProperties,
- applicationServer,
- applicationProperties.getUsersInputTopic(),
- applicationProperties.getRankingInputTopic(),
- userStoreSupplier,
- rankingStoreSupplier);
-
- streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
-
- return streamProcessor;
- }
-
- @Bean
- public KeyValueBytesStoreSupplier userStoreSupplier()
- {
- return Stores.persistentKeyValueStore(USER_STORE_NAME);
- }
-
- @Bean
- public KeyValueBytesStoreSupplier rankingStoreSupplier()
- {
- return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.query")
-@Getter
-@Setter
-@ToString
-public class QueryApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String applicationId = "query";
- private String rankingInputTopic = "top10";
- private String usersInputTopic = "users";
- private Integer commitInterval;
- private Integer cacheMaxBytes;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.net.URI;
-import java.util.Optional;
-
-
-@RestController
-@RequiredArgsConstructor
-public class QueryController
-{
- private final QueryStreamProcessor processor;
-
- @GetMapping("{username}")
- ResponseEntity<UserRanking> queryFor(@PathVariable String username)
- {
- Optional<URI> redirect = processor.getRedirect(username);
- if (redirect.isPresent())
- {
- return
- ResponseEntity
- .status(HttpStatus.TEMPORARY_REDIRECT)
- .location(redirect.get())
- .build();
- }
-
- try
- {
- return ResponseEntity.of(processor.getUserRanking(username));
- }
- catch (InvalidStateStoreException e)
- {
- return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.*;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.net.URI;
-import java.util.Optional;
-import java.util.Properties;
-
-
-@Slf4j
-public class QueryStreamProcessor
-{
- public static final String STATS_TYPE = "COUNTER";
- public static final String USER_STORE_NAME = "users";
- public static final String RANKING_STORE_NAME = "rankings";
-
- public final KafkaStreams streams;
- public final HostInfo hostInfo;
- public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
-
-
- public QueryStreamProcessor(
- Properties props,
- HostInfo applicationServer,
- String usersInputTopic,
- String rankingInputTopic,
- KeyValueBytesStoreSupplier userStoreSupplier,
- KeyValueBytesStoreSupplier rankingStoreSupplier)
- {
- Topology topology = buildTopology(
- usersInputTopic,
- rankingInputTopic,
- userStoreSupplier,
- rankingStoreSupplier);
- streams = new KafkaStreams(topology, props);
- hostInfo = applicationServer;
- storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
- }
-
- static Topology buildTopology(
- String usersInputTopic,
- String rankingInputTopic,
- KeyValueBytesStoreSupplier userStoreSupplier,
- KeyValueBytesStoreSupplier rankingStoreSupplier)
- {
- StreamsBuilder builder = new StreamsBuilder();
-
- KTable<String, User> users = builder
- .stream(
- usersInputTopic,
- Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
- .toTable(
- Materialized
- .<String, User>as(userStoreSupplier)
- .withKeySerde(Serdes.String())
- .withValueSerde(new JsonSerde().copyWithType(User.class)));
- KStream<String, Ranking> rankings = builder
- .<Key, Ranking>stream(rankingInputTopic)
- .filter((key, value) -> STATS_TYPE.equals(key.getType()))
- .map((key, value) -> new KeyValue<>(key.getChannel(), value));
-
- rankings
- .join(users, (ranking, user) -> UserRanking.of(
- user.getFirstName(),
- user.getLastName(),
- ranking.getEntries()),
- Joined.keySerde(Serdes.String()))
- .toTable(
- Materialized
- .<String, UserRanking>as(rankingStoreSupplier)
- .withKeySerde(Serdes.String())
- .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
-
- Topology topology = builder.build();
- log.info("\n\n{}", topology.describe());
-
- return topology;
- }
-
- ReadOnlyKeyValueStore<String, UserRanking> getStore()
- {
- return streams.store(storeParameters);
- }
-
- public Optional<URI> getRedirect(String username)
- {
- KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
- HostInfo activeHost = metadata.activeHost();
- log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
-
- if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
- {
- return Optional.empty();
- }
-
- URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
- log.debug("Redirecting to {}", location);
- return Optional.of(location);
- }
-
- public Optional<UserRanking> getUserRanking(String username)
- {
- return Optional.ofNullable(getStore().get(username));
- }
-
- @PostConstruct
- public void start()
- {
- log.info("Starting Stream-Processor");
- streams.start();
- }
-
- @PreDestroy
- public void stop()
- {
- log.info("Stopping Stream-Processor");
- streams.close();
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-
-
-@Data
-public class Ranking
-{
- private Entry[] entries;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-
-@Data
-@EqualsAndHashCode(of = "username")
-public class User
-{
- public enum Sex { FEMALE, MALE, OTHER }
-
- private String username;
-
- private String firstName;
- private String lastName;
- private Sex sex;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.*;
-
-
-@Data
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-public class UserRanking
-{
- private String firstName;
- private String lastName;
- private Entry[] top10;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+
+
+@Data
+public class Entry
+{
+ private String key;
+ private Long counter;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+
+
+@Data
+public class Key
+{
+ private String type;
+ private String channel;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+
+
+@Data
+public class Ranking
+{
+ private Entry[] entries;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class QueryApplication
+{
+ public static void main(String[] args)
+ {
+ SpringApplication.run(QueryApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(QueryApplicationProperties.class)
+@Slf4j
+public class QueryApplicationConfiguration
+{
+ @Bean
+ public HostInfo applicationServer(
+ ServerProperties serverProperties,
+ QueryApplicationProperties applicationProperties) throws IOException
+ {
+ String host;
+ if (serverProperties.getAddress() == null)
+ {
+ HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
+ Socket socket = new Socket();
+ socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+ host = socket.getLocalAddress().getHostAddress();
+ }
+ else
+ {
+ host = serverProperties.getAddress().getHostAddress();
+ }
+
+ Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+ return new HostInfo(host, port);
+ }
+
+ @Bean
+ public Properties streamProcessorProperties(
+ QueryApplicationProperties applicationProperties,
+ HostInfo applicationServer)
+ {
+ Properties props = new Properties();
+
+ props.putAll(serializationConfig());
+
+ String applicationId = applicationProperties.getApplicationId();
+ String bootstrapServer = applicationProperties.getBootstrapServer();
+
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+ props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
+ if (applicationProperties.getCommitInterval() != null)
+ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval());
+ if (applicationProperties.getCacheMaxBytes() != null)
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes());
+
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return props;
+ }
+
+ static Properties serializationConfig()
+ {
+ Properties props = new Properties();
+
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "stats:" + Key.class.getName() + "," +
+ "ranking:" + Ranking.class.getName() + "," +
+ "userranking:" + UserRanking.class.getName());
+
+ return props;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public QueryStreamProcessor streamProcessor(
+ Properties streamProcessorProperties,
+ HostInfo applicationServer,
+ QueryApplicationProperties applicationProperties,
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier,
+ ConfigurableApplicationContext context)
+ {
+ QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+ streamProcessorProperties,
+ applicationServer,
+ applicationProperties.getUsersInputTopic(),
+ applicationProperties.getRankingInputTopic(),
+ userStoreSupplier,
+ rankingStoreSupplier);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+
+ return streamProcessor;
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier userStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.query")
+@Getter
+@Setter
+@ToString
+public class QueryApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String applicationId = "query";
+ private String rankingInputTopic = "top10";
+ private String usersInputTopic = "users";
+ private Integer commitInterval;
+ private Integer cacheMaxBytes;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.net.URI;
+import java.util.Optional;
+
+
+@RestController
+@RequiredArgsConstructor
+public class QueryController
+{
+ private final QueryStreamProcessor processor;
+
+ @GetMapping("{username}")
+ ResponseEntity<UserRanking> queryFor(@PathVariable String username)
+ {
+ Optional<URI> redirect = processor.getRedirect(username);
+ if (redirect.isPresent())
+ {
+ return
+ ResponseEntity
+ .status(HttpStatus.TEMPORARY_REDIRECT)
+ .location(redirect.get())
+ .build();
+ }
+
+ try
+ {
+ return ResponseEntity.of(processor.getUserRanking(username));
+ }
+ catch (InvalidStateStoreException e)
+ {
+ return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+
+
+@Slf4j
+public class QueryStreamProcessor
+{
+ public static final String STATS_TYPE = "COUNTER";
+ public static final String USER_STORE_NAME = "users";
+ public static final String RANKING_STORE_NAME = "rankings";
+
+ public final KafkaStreams streams;
+ public final HostInfo hostInfo;
+ public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
+
+
+ public QueryStreamProcessor(
+ Properties props,
+ HostInfo applicationServer,
+ String usersInputTopic,
+ String rankingInputTopic,
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier)
+ {
+ Topology topology = buildTopology(
+ usersInputTopic,
+ rankingInputTopic,
+ userStoreSupplier,
+ rankingStoreSupplier);
+ streams = new KafkaStreams(topology, props);
+ hostInfo = applicationServer;
+ storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
+ }
+
+ static Topology buildTopology(
+ String usersInputTopic,
+ String rankingInputTopic,
+ KeyValueBytesStoreSupplier userStoreSupplier,
+ KeyValueBytesStoreSupplier rankingStoreSupplier)
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ KTable<String, User> users = builder
+ .stream(
+ usersInputTopic,
+ Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
+ .toTable(
+ Materialized
+ .<String, User>as(userStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(new JsonSerde().copyWithType(User.class)));
+ KStream<String, Ranking> rankings = builder
+ .<Key, Ranking>stream(rankingInputTopic)
+ .filter((key, value) -> STATS_TYPE.equals(key.getType()))
+ .map((key, value) -> new KeyValue<>(key.getChannel(), value));
+
+ rankings
+ .join(users, (ranking, user) -> UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()),
+ Joined.keySerde(Serdes.String()))
+ .toTable(
+ Materialized
+ .<String, UserRanking>as(rankingStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
+ }
+
+ ReadOnlyKeyValueStore<String, UserRanking> getStore()
+ {
+ return streams.store(storeParameters);
+ }
+
+ public Optional<URI> getRedirect(String username)
+ {
+ KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
+ HostInfo activeHost = metadata.activeHost();
+ log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
+
+ if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
+ {
+ return Optional.empty();
+ }
+
+ URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
+ log.debug("Redirecting to {}", location);
+ return Optional.of(location);
+ }
+
+ public Optional<UserRanking> getUserRanking(String username)
+ {
+ return Optional.ofNullable(getStore().get(username));
+ }
+
+ @PostConstruct
+ public void start()
+ {
+ log.info("Starting Stream-Processor");
+ streams.start();
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ log.info("Stopping Stream-Processor");
+ streams.close();
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(of = "username")
+public class User
+{
+ public enum Sex { FEMALE, MALE, OTHER }
+
+ private String username;
+
+ private String firstName;
+ private String lastName;
+ private Sex sex;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.*;
+
+
+@Data
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+public class UserRanking
+{
+ private String firstName;
+ private String lastName;
+ private Entry[] top10;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.http.MediaType;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
-
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
-import static org.awaitility.Awaitility.await;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-
-@SpringBootTest(
- properties = {
- "spring.main.allow-bean-definition-overriding=true",
- "logging.level.root=WARN",
- "logging.level.de.juplo=DEBUG",
- "logging.level.org.apache.kafka.clients=INFO",
- "logging.level.org.apache.kafka.streams=INFO",
- "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.query.commit-interval=100",
- "juplo.wordcount.query.cache-max-bytes=0",
- "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
- "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
-@AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
-@Slf4j
-public class QueryApplicationIT
-{
- public static final String TOPIC_TOP10 = "top10";
- public static final String TOPIC_USERS = "users";
-
-
- @Autowired
- MockMvc mockMvc;
- @Autowired
- ObjectMapper objectMapper;
- @Autowired
- QueryStreamProcessor streamProcessor;
-
-
- @BeforeAll
- public static void testSendMessage(
- @Autowired KafkaTemplate usersKafkaTemplate,
- @Autowired KafkaTemplate top10KafkaTemplate)
- {
- TestData
- .getUsersMessages()
- .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
- TestData
- .getTop10Messages()
- .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
- }
-
- private static void flush(CompletableFuture<SendResult> future)
- {
- try
- {
- SendResult result = future.get();
- log.info(
- "Sent: {}={}, partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @DisplayName("Await, that the expected state is visible in the state-store")
- @Test
- public void testAwaitExpectedStateInStore()
- {
- await("The expected state is visible in the state-store")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
- }
-
- @DisplayName("Await, that the expected state is queryable")
- @Test
- public void testAwaitExpectedStateIsQueryable()
- {
- await("The expected state is queryable")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
- }
-
- private UserRanking requestUserRankingFor(String user)
- {
- try
- {
- return objectMapper.readValue(
- mockMvc
- .perform(MockMvcRequestBuilders.get("/{user}", user)
- .contentType(MediaType.APPLICATION_JSON))
- .andExpect(status().isOk())
- .andReturn()
- .getResponse()
- .getContentAsString(StandardCharsets.UTF_8),
- UserRanking.class);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
- {
- Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
- return new KafkaTemplate(producerFactory, properties);
- }
-
- @Bean
- KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
- {
- Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
- return new KafkaTemplate(producerFactory, properties);
- }
-
- @Bean
- KeyValueBytesStoreSupplier userStoreSupplier()
- {
- return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
- }
-
- @Bean
- KeyValueBytesStoreSupplier rankingStoreSupplier()
- {
- return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import de.juplo.kafka.wordcount.users.TestUserData;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
-
-
-@Slf4j
-public class QueryStreamProcessorTopologyTest
-{
- public static final String TOP10_IN = "TOP10-IN";
- public static final String USERS_IN = "USERS-IN";
- public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
- public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
-
-
- TopologyTestDriver testDriver;
- TestInputTopic<TestUser, TestRanking> top10In;
- TestInputTopic<String, TestUserData> userIn;
-
-
- @BeforeEach
- public void setUp()
- {
- Topology topology = QueryStreamProcessor.buildTopology(
- USERS_IN,
- TOP10_IN,
- Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
- Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
-
- testDriver = new TopologyTestDriver(topology, serializationConfig());
-
- top10In = testDriver.createInputTopic(
- TOP10_IN,
- jsonSerializer(TestUser.class, true),
- jsonSerializer(TestRanking.class,false));
-
- userIn = testDriver.createInputTopic(
- USERS_IN,
- new StringSerializer(),
- jsonSerializer(TestUserData.class, false).noTypeInfo());
- }
-
-
- @Test
- public void test()
- {
- TestData
- .getUsersMessages()
- .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
- TestData
- .getTop10Messages()
- .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
-
- KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
- TestData.assertExpectedState(user -> store.get(user));
- }
-
- @AfterEach
- public void tearDown()
- {
- testDriver.close();
- }
-
- private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
- {
- JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
- jsonSerializer.configure(
- Map.of(
- JsonSerializer.TYPE_MAPPINGS,
- "stats:" + TestUser.class.getName() + "," +
- "ranking:" + TestRanking.class.getName()),
- isKey);
- return jsonSerializer;
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import de.juplo.kafka.wordcount.top10.TestEntry;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import de.juplo.kafka.wordcount.users.TestUserData;
-import org.apache.kafka.streams.KeyValue;
-
-import java.util.Arrays;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
- static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
- static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
-
- static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
- {
- return Stream.of(TOP10_MESSAGES);
- }
-
- static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
- {
- return Stream.of(USERS_MESSAGES);
- }
-
- static void assertExpectedState(Function<String, UserRanking> function)
- {
- assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
- assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
- }
-
- private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
- {
- assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
- }
-
- private static UserRanking getLastMessageFor(String user)
- {
- return getTop10Messages()
- .filter(kv -> kv.key.getChannel().equals(user))
- .map(kv -> kv.value)
- .map(testRanking -> userRankingFor(user, testRanking))
- .reduce(null, (left, right) -> right);
- }
-
- private static UserRanking userRankingFor(String user, TestRanking testRanking)
- {
- TestUserData testUserData = getUsersMessages()
- .filter(kv -> kv.key.equals(user))
- .map(kv -> kv.value)
- .reduce(null, (left, right) -> right);
-
- Entry[] entries = Arrays
- .stream(testRanking.getEntries())
- .map(testEntry -> entryOf(testEntry))
- .toArray(size -> new Entry[size]);
-
- return UserRanking.of(
- testUserData.getFirstName(),
- testUserData.getLastName(),
- entries);
- }
-
- private static Entry entryOf(TestEntry testEntry)
- {
- Entry entry = new Entry();
- entry.setKey(testEntry.getKey());
- entry.setCounter(testEntry.getCounter());
- return entry;
- }
- private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
- {
- KeyValue.pair( // 0
- PETER,
- TestRanking.of(
- TestEntry.of("Hallo", 1l))),
- KeyValue.pair( // 1
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 1l))),
- KeyValue.pair( // 2
- PETER,
- TestRanking.of(
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Welt", 1l))),
- KeyValue.pair( // 3
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 2l))),
- KeyValue.pair( // 4
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 2l),
- TestEntry.of("s", 1l))),
- KeyValue.pair( // 5
- PETER,
- TestRanking.of(
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Welt", 1l),
- TestEntry.of("Boäh", 1l))),
- KeyValue.pair( // 6
- PETER,
- TestRanking.of(
- TestEntry.of("Welt", 2l),
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Boäh", 1l))),
- KeyValue.pair( // 7
- PETER,
- TestRanking.of(
- TestEntry.of("Welt", 2l),
- TestEntry.of("Boäh", 2l),
- TestEntry.of("Hallo", 1l))),
- KeyValue.pair( // 8
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 2l),
- TestEntry.of("s", 2l))),
- KeyValue.pair( // 9
- PETER,
- TestRanking.of(
- TestEntry.of("Boäh", 3l),
- TestEntry.of("Welt", 2l),
- TestEntry.of("Hallo", 1l))),
- KeyValue.pair( // 10
- KLAUS,
- TestRanking.of(
- TestEntry.of("s", 3l),
- TestEntry.of("Müsch", 2l))),
- };
-
- private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
- {
- KeyValue.pair(
- PETER.getChannel(),
- TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
- KeyValue.pair(
- KLAUS.getChannel(),
- TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
- };
-}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "logging.level.root=WARN",
+ "logging.level.de.juplo=DEBUG",
+ "logging.level.org.apache.kafka.clients=INFO",
+ "logging.level.org.apache.kafka.streams=INFO",
+ "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.query.commit-interval=100",
+ "juplo.wordcount.query.cache-max-bytes=0",
+ "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
+ "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@Slf4j
+public class QueryApplicationIT
+{
+ public static final String TOPIC_TOP10 = "top10";
+ public static final String TOPIC_USERS = "users";
+
+
+ @Autowired
+ MockMvc mockMvc;
+ @Autowired
+ ObjectMapper objectMapper;
+ @Autowired
+ QueryStreamProcessor streamProcessor;
+
+
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate usersKafkaTemplate,
+ @Autowired KafkaTemplate top10KafkaTemplate)
+ {
+ TestData
+ .getUsersMessages()
+ .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+ TestData
+ .getTop10Messages()
+ .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+ }
+
+ private static void flush(CompletableFuture<SendResult> future)
+ {
+ try
+ {
+ SendResult result = future.get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @DisplayName("Await, that the expected state is visible in the state-store")
+ @Test
+ public void testAwaitExpectedStateInStore()
+ {
+ await("The expected state is visible in the state-store")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
+ }
+
+ @DisplayName("Await, that the expected state is queryable")
+ @Test
+ public void testAwaitExpectedStateIsQueryable()
+ {
+ await("The expected state is queryable")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
+ }
+
+ private UserRanking requestUserRankingFor(String user)
+ {
+ try
+ {
+ return objectMapper.readValue(
+ mockMvc
+ .perform(MockMvcRequestBuilders.get("/{user}", user)
+ .contentType(MediaType.APPLICATION_JSON))
+ .andExpect(status().isOk())
+ .andReturn()
+ .getResponse()
+ .getContentAsString(StandardCharsets.UTF_8),
+ UserRanking.class);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
+ {
+ Map<String, Object> properties = Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+ JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
+ return new KafkaTemplate(producerFactory, properties);
+ }
+
+ @Bean
+ KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
+ {
+ Map<String, Object> properties = Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+ JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
+ return new KafkaTemplate(producerFactory, properties);
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier userStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier rankingStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Map;
+
+import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class QueryStreamProcessorTopologyTest
+{
+ public static final String TOP10_IN = "TOP10-IN";
+ public static final String USERS_IN = "USERS-IN";
+ public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
+ public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
+
+
+ TopologyTestDriver testDriver;
+ TestInputTopic<TestUser, TestRanking> top10In;
+ TestInputTopic<String, TestUserData> userIn;
+
+
+ @BeforeEach
+ public void setUp()
+ {
+ Topology topology = QueryStreamProcessor.buildTopology(
+ USERS_IN,
+ TOP10_IN,
+ Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+ Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
+
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+ top10In = testDriver.createInputTopic(
+ TOP10_IN,
+ jsonSerializer(TestUser.class, true),
+ jsonSerializer(TestRanking.class,false));
+
+ userIn = testDriver.createInputTopic(
+ USERS_IN,
+ new StringSerializer(),
+ jsonSerializer(TestUserData.class, false).noTypeInfo());
+ }
+
+
+ @Test
+ public void test()
+ {
+ TestData
+ .getUsersMessages()
+ .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
+ TestData
+ .getTop10Messages()
+ .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+
+ KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
+ TestData.assertExpectedState(user -> store.get(user));
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ testDriver.close();
+ }
+
+ private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+ {
+ JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+ jsonSerializer.configure(
+ Map.of(
+ JsonSerializer.TYPE_MAPPINGS,
+ "stats:" + TestUser.class.getName() + "," +
+ "ranking:" + TestRanking.class.getName()),
+ isKey);
+ return jsonSerializer;
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.TestEntry;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Arrays;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
+ static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
+
+ static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
+ {
+ return Stream.of(TOP10_MESSAGES);
+ }
+
+ static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
+ {
+ return Stream.of(USERS_MESSAGES);
+ }
+
+ static void assertExpectedState(Function<String, UserRanking> function)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
+ assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
+ {
+ assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
+ }
+
+ private static UserRanking getLastMessageFor(String user)
+ {
+ return getTop10Messages()
+ .filter(kv -> kv.key.getChannel().equals(user))
+ .map(kv -> kv.value)
+ .map(testRanking -> userRankingFor(user, testRanking))
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static UserRanking userRankingFor(String user, TestRanking testRanking)
+ {
+ TestUserData testUserData = getUsersMessages()
+ .filter(kv -> kv.key.equals(user))
+ .map(kv -> kv.value)
+ .reduce(null, (left, right) -> right);
+
+ Entry[] entries = Arrays
+ .stream(testRanking.getEntries())
+ .map(testEntry -> entryOf(testEntry))
+ .toArray(size -> new Entry[size]);
+
+ return UserRanking.of(
+ testUserData.getFirstName(),
+ testUserData.getLastName(),
+ entries);
+ }
+
+ private static Entry entryOf(TestEntry testEntry)
+ {
+ Entry entry = new Entry();
+ entry.setKey(testEntry.getKey());
+ entry.setCounter(testEntry.getCounter());
+ return entry;
+ }
+ private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair( // 0
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 1
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 1l))),
+ KeyValue.pair( // 2
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l))),
+ KeyValue.pair( // 3
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l))),
+ KeyValue.pair( // 4
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 1l))),
+ KeyValue.pair( // 5
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 6
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 7
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Boäh", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 8
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 2l))),
+ KeyValue.pair( // 9
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Boäh", 3l),
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 10
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("s", 3l),
+ TestEntry.of("Müsch", 2l))),
+ };
+
+ private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair(
+ PETER.getChannel(),
+ TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
+ KeyValue.pair(
+ KLAUS.getChannel(),
+ TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ };
+}