From: Kai Moritz Date: Tue, 25 Jun 2024 04:03:10 +0000 (+0200) Subject: stats: 1.0.0 - Renamed the project into `stats` -- MOVE X-Git-Tag: stats-1.0.0~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a54e04b02d5c87ebf29efd96295149c9f3f7b734;p=demos%2Fkafka%2Fwordcount stats: 1.0.0 - Renamed the project into `stats` -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java deleted file mode 100644 index 383b1a6..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; - - -@Data -public class Entry -{ - private String key; - private Long counter; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java deleted file mode 100644 index a2d85a1..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; - - -@Data -public class Key -{ - private String type; - private String channel; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java deleted file mode 100644 index eeee7eb..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class QueryApplication -{ - public static void main(String[] args) - { - SpringApplication.run(QueryApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java deleted file mode 100644 index 0f9cad1..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ /dev/null @@ -1,141 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.web.ServerProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(QueryApplicationProperties.class) -@Slf4j -public class QueryApplicationConfiguration -{ - @Bean - public HostInfo applicationServer( - ServerProperties serverProperties, - QueryApplicationProperties applicationProperties) throws IOException - { - String host; - if (serverProperties.getAddress() == null) - { - HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer()); - Socket socket = new Socket(); - socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); - host = socket.getLocalAddress().getHostAddress(); - } - else - { - host = serverProperties.getAddress().getHostAddress(); - } - - Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); - - return new HostInfo(host, port); - } - - @Bean - public Properties streamProcessorProperties( - QueryApplicationProperties applicationProperties, - HostInfo applicationServer) - { - Properties props = new Properties(); - - props.putAll(serializationConfig()); - - String applicationId = applicationProperties.getApplicationId(); - String bootstrapServer = applicationProperties.getBootstrapServer(); - - props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); - if (applicationProperties.getCommitInterval() != null) - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); - if (applicationProperties.getCacheMaxBytes() != null) - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); - - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return props; - } - - static Properties serializationConfig() - { - Properties props = new Properties(); - - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put( - JsonDeserializer.TYPE_MAPPINGS, - "stats:" + Key.class.getName() + "," + - "ranking:" + Ranking.class.getName() + "," + - "userranking:" + UserRanking.class.getName()); - - return props; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public QueryStreamProcessor streamProcessor( - Properties streamProcessorProperties, - HostInfo applicationServer, - QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier, - ConfigurableApplicationContext context) - { - QueryStreamProcessor streamProcessor = new QueryStreamProcessor( - streamProcessorProperties, - applicationServer, - applicationProperties.getUsersInputTopic(), - applicationProperties.getRankingInputTopic(), - userStoreSupplier, - rankingStoreSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier userStoreSupplier() - { - return Stores.persistentKeyValueStore(USER_STORE_NAME); - } - - @Bean - public KeyValueBytesStoreSupplier rankingStoreSupplier() - { - return Stores.persistentKeyValueStore(RANKING_STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java deleted file mode 100644 index 4a9eeca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.query; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.query") -@Getter -@Setter -@ToString -public class QueryApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "query"; - private String rankingInputTopic = "top10"; - private String usersInputTopic = "users"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java deleted file mode 100644 index a9b5b80..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; - -import java.net.URI; -import java.util.Optional; - - -@RestController -@RequiredArgsConstructor -public class QueryController -{ - private final QueryStreamProcessor processor; - - @GetMapping("{username}") - ResponseEntity queryFor(@PathVariable String username) - { - Optional redirect = processor.getRedirect(username); - if (redirect.isPresent()) - { - return - ResponseEntity - .status(HttpStatus.TEMPORARY_REDIRECT) - .location(redirect.get()) - .build(); - } - - try - { - return ResponseEntity.of(processor.getUserRanking(username)); - } - catch (InvalidStateStoreException e) - { - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java deleted file mode 100644 index 5543a91..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ /dev/null @@ -1,129 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.net.URI; -import java.util.Optional; -import java.util.Properties; - - -@Slf4j -public class QueryStreamProcessor -{ - public static final String STATS_TYPE = "COUNTER"; - public static final String USER_STORE_NAME = "users"; - public static final String RANKING_STORE_NAME = "rankings"; - - public final KafkaStreams streams; - public final HostInfo hostInfo; - public final StoreQueryParameters> storeParameters; - - - public QueryStreamProcessor( - Properties props, - HostInfo applicationServer, - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) - { - Topology topology = buildTopology( - usersInputTopic, - rankingInputTopic, - userStoreSupplier, - rankingStoreSupplier); - streams = new KafkaStreams(topology, props); - hostInfo = applicationServer; - storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; - } - - static Topology buildTopology( - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - KTable users = builder - .stream( - usersInputTopic, - Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) - .toTable( - Materialized - .as(userStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder - .stream(rankingInputTopic) - .filter((key, value) -> STATS_TYPE.equals(key.getType())) - .map((key, value) -> new KeyValue<>(key.getChannel(), value)); - - rankings - .join(users, (ranking, user) -> UserRanking.of( - user.getFirstName(), - user.getLastName(), - ranking.getEntries()), - Joined.keySerde(Serdes.String())) - .toTable( - Materialized - .as(rankingStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(storeParameters); - } - - public Optional getRedirect(String username) - { - KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); - HostInfo activeHost = metadata.activeHost(); - log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); - - if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable())) - { - return Optional.empty(); - } - - URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); - log.debug("Redirecting to {}", location); - return Optional.of(location); - } - - public Optional getUserRanking(String username) - { - return Optional.ofNullable(getStore().get(username)); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java deleted file mode 100644 index 8966be6..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; - - -@Data -public class Ranking -{ - private Entry[] entries; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java deleted file mode 100644 index f62b475..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/User.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; -import lombok.EqualsAndHashCode; - - -@Data -@EqualsAndHashCode(of = "username") -public class User -{ - public enum Sex { FEMALE, MALE, OTHER } - - private String username; - - private String firstName; - private String lastName; - private Sex sex; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java deleted file mode 100644 index 9ca765a..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.*; - - -@Data -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -public class UserRanking -{ - private String firstName; - private String lastName; - private Entry[] top10; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java new file mode 100644 index 0000000..383b1a6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; + + +@Data +public class Entry +{ + private String key; + private Long counter; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java new file mode 100644 index 0000000..a2d85a1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; + + +@Data +public class Key +{ + private String type; + private String channel; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java new file mode 100644 index 0000000..8966be6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java @@ -0,0 +1,10 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; + + +@Data +public class Ranking +{ + private Entry[] entries; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java new file mode 100644 index 0000000..eeee7eb --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.query; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class QueryApplication +{ + public static void main(String[] args) + { + SpringApplication.run(QueryApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java new file mode 100644 index 0000000..0f9cad1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java @@ -0,0 +1,141 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(QueryApplicationProperties.class) +@Slf4j +public class QueryApplicationConfiguration +{ + @Bean + public HostInfo applicationServer( + ServerProperties serverProperties, + QueryApplicationProperties applicationProperties) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new HostInfo(host, port); + } + + @Bean + public Properties streamProcessorProperties( + QueryApplicationProperties applicationProperties, + HostInfo applicationServer) + { + Properties props = new Properties(); + + props.putAll(serializationConfig()); + + String applicationId = applicationProperties.getApplicationId(); + String bootstrapServer = applicationProperties.getBootstrapServer(); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + static Properties serializationConfig() + { + Properties props = new Properties(); + + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "stats:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userranking:" + UserRanking.class.getName()); + + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public QueryStreamProcessor streamProcessor( + Properties streamProcessorProperties, + HostInfo applicationServer, + QueryApplicationProperties applicationProperties, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, + ConfigurableApplicationContext context) + { + QueryStreamProcessor streamProcessor = new QueryStreamProcessor( + streamProcessorProperties, + applicationServer, + applicationProperties.getUsersInputTopic(), + applicationProperties.getRankingInputTopic(), + userStoreSupplier, + rankingStoreSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java new file mode 100644 index 0000000..4a9eeca --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.query; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.query") +@Getter +@Setter +@ToString +public class QueryApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "query"; + private String rankingInputTopic = "top10"; + private String usersInputTopic = "users"; + private Integer commitInterval; + private Integer cacheMaxBytes; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java new file mode 100644 index 0000000..a9b5b80 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java @@ -0,0 +1,43 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URI; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +public class QueryController +{ + private final QueryStreamProcessor processor; + + @GetMapping("{username}") + ResponseEntity queryFor(@PathVariable String username) + { + Optional redirect = processor.getRedirect(username); + if (redirect.isPresent()) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(redirect.get()) + .build(); + } + + try + { + return ResponseEntity.of(processor.getUserRanking(username)); + } + catch (InvalidStateStoreException e) + { + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java new file mode 100644 index 0000000..5543a91 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java @@ -0,0 +1,129 @@ +package de.juplo.kafka.wordcount.query; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.net.URI; +import java.util.Optional; +import java.util.Properties; + + +@Slf4j +public class QueryStreamProcessor +{ + public static final String STATS_TYPE = "COUNTER"; + public static final String USER_STORE_NAME = "users"; + public static final String RANKING_STORE_NAME = "rankings"; + + public final KafkaStreams streams; + public final HostInfo hostInfo; + public final StoreQueryParameters> storeParameters; + + + public QueryStreamProcessor( + Properties props, + HostInfo applicationServer, + String usersInputTopic, + String rankingInputTopic, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) + { + Topology topology = buildTopology( + usersInputTopic, + rankingInputTopic, + userStoreSupplier, + rankingStoreSupplier); + streams = new KafkaStreams(topology, props); + hostInfo = applicationServer; + storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; + } + + static Topology buildTopology( + String usersInputTopic, + String rankingInputTopic, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) + { + StreamsBuilder builder = new StreamsBuilder(); + + KTable users = builder + .stream( + usersInputTopic, + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) + .toTable( + Materialized + .as(userStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(User.class))); + KStream rankings = builder + .stream(rankingInputTopic) + .filter((key, value) -> STATS_TYPE.equals(key.getType())) + .map((key, value) -> new KeyValue<>(key.getChannel(), value)); + + rankings + .join(users, (ranking, user) -> UserRanking.of( + user.getFirstName(), + user.getLastName(), + ranking.getEntries()), + Joined.keySerde(Serdes.String())) + .toTable( + Materialized + .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; + } + + ReadOnlyKeyValueStore getStore() + { + return streams.store(storeParameters); + } + + public Optional getRedirect(String username) + { + KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); + HostInfo activeHost = metadata.activeHost(); + log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); + + if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable())) + { + return Optional.empty(); + } + + URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); + log.debug("Redirecting to {}", location); + return Optional.of(location); + } + + public Optional getUserRanking(String username) + { + return Optional.ofNullable(getStore().get(username)); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/User.java b/src/main/java/de/juplo/kafka/wordcount/stats/User.java new file mode 100644 index 0000000..f62b475 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/User.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@EqualsAndHashCode(of = "username") +public class User +{ + public enum Sex { FEMALE, MALE, OTHER } + + private String username; + + private String firstName; + private String lastName; + private Sex sex; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java new file mode 100644 index 0000000..9ca765a --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.*; + + +@Data +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +public class UserRanking +{ + private String firstName; + private String lastName; + private Entry[] top10; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java deleted file mode 100644 index fb12aee..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ /dev/null @@ -1,172 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; - -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.main.allow-bean-definition-overriding=true", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "logging.level.org.apache.kafka.clients=INFO", - "logging.level.org.apache.kafka.streams=INFO", - "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.query.commit-interval=100", - "juplo.wordcount.query.cache-max-bytes=0", - "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, - "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) -@Slf4j -public class QueryApplicationIT -{ - public static final String TOPIC_TOP10 = "top10"; - public static final String TOPIC_USERS = "users"; - - - @Autowired - MockMvc mockMvc; - @Autowired - ObjectMapper objectMapper; - @Autowired - QueryStreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate usersKafkaTemplate, - @Autowired KafkaTemplate top10KafkaTemplate) - { - TestData - .getUsersMessages() - .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); - TestData - .getTop10Messages() - .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); - } - - private static void flush(CompletableFuture future) - { - try - { - SendResult result = future.get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @DisplayName("Await, that the expected state is visible in the state-store") - @Test - public void testAwaitExpectedStateInStore() - { - await("The expected state is visible in the state-store") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); - } - - @DisplayName("Await, that the expected state is queryable") - @Test - public void testAwaitExpectedStateIsQueryable() - { - await("The expected state is queryable") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); - } - - private UserRanking requestUserRankingFor(String user) - { - try - { - return objectMapper.readValue( - mockMvc - .perform(MockMvcRequestBuilders.get("/{user}", user) - .contentType(MediaType.APPLICATION_JSON)) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(StandardCharsets.UTF_8), - UserRanking.class); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) - { - Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.ADD_TYPE_INFO_HEADERS, false); - return new KafkaTemplate(producerFactory, properties); - } - - @Bean - KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) - { - Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); - return new KafkaTemplate(producerFactory, properties); - } - - @Bean - KeyValueBytesStoreSupplier userStoreSupplier() - { - return Stores.inMemoryKeyValueStore(USER_STORE_NAME); - } - - @Bean - KeyValueBytesStoreSupplier rankingStoreSupplier() - { - return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java deleted file mode 100644 index fbeb19b..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ /dev/null @@ -1,91 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import de.juplo.kafka.wordcount.users.TestUserData; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.Map; - -import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; - - -@Slf4j -public class QueryStreamProcessorTopologyTest -{ - public static final String TOP10_IN = "TOP10-IN"; - public static final String USERS_IN = "USERS-IN"; - public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; - public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; - - - TopologyTestDriver testDriver; - TestInputTopic top10In; - TestInputTopic userIn; - - - @BeforeEach - public void setUp() - { - Topology topology = QueryStreamProcessor.buildTopology( - USERS_IN, - TOP10_IN, - Stores.inMemoryKeyValueStore(USERS_STORE_NAME), - Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - top10In = testDriver.createInputTopic( - TOP10_IN, - jsonSerializer(TestUser.class, true), - jsonSerializer(TestRanking.class,false)); - - userIn = testDriver.createInputTopic( - USERS_IN, - new StringSerializer(), - jsonSerializer(TestUserData.class, false).noTypeInfo()); - } - - - @Test - public void test() - { - TestData - .getUsersMessages() - .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); - TestData - .getTop10Messages() - .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - - KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); - TestData.assertExpectedState(user -> store.get(user)); - } - - @AfterEach - public void tearDown() - { - testDriver.close(); - } - - private JsonSerializer jsonSerializer(Class type, boolean isKey) - { - JsonSerializer jsonSerializer = new JsonSerializer<>(); - jsonSerializer.configure( - Map.of( - JsonSerializer.TYPE_MAPPINGS, - "stats:" + TestUser.class.getName() + "," + - "ranking:" + TestRanking.class.getName()), - isKey); - return jsonSerializer; - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java deleted file mode 100644 index 44162a0..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ /dev/null @@ -1,146 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import de.juplo.kafka.wordcount.top10.TestEntry; -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import de.juplo.kafka.wordcount.users.TestUserData; -import org.apache.kafka.streams.KeyValue; - -import java.util.Arrays; -import java.util.function.Function; -import java.util.stream.Stream; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE; -import static org.assertj.core.api.Assertions.assertThat; - - -class TestData -{ - static final TestUser PETER = TestUser.of(STATS_TYPE, "peter"); - static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus"); - - static final Stream> getTop10Messages() - { - return Stream.of(TOP10_MESSAGES); - } - - static final Stream> getUsersMessages() - { - return Stream.of(USERS_MESSAGES); - } - - static void assertExpectedState(Function function) - { - assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel())); - assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel())); - } - - private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) - { - assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); - } - - private static UserRanking getLastMessageFor(String user) - { - return getTop10Messages() - .filter(kv -> kv.key.getChannel().equals(user)) - .map(kv -> kv.value) - .map(testRanking -> userRankingFor(user, testRanking)) - .reduce(null, (left, right) -> right); - } - - private static UserRanking userRankingFor(String user, TestRanking testRanking) - { - TestUserData testUserData = getUsersMessages() - .filter(kv -> kv.key.equals(user)) - .map(kv -> kv.value) - .reduce(null, (left, right) -> right); - - Entry[] entries = Arrays - .stream(testRanking.getEntries()) - .map(testEntry -> entryOf(testEntry)) - .toArray(size -> new Entry[size]); - - return UserRanking.of( - testUserData.getFirstName(), - testUserData.getLastName(), - entries); - } - - private static Entry entryOf(TestEntry testEntry) - { - Entry entry = new Entry(); - entry.setKey(testEntry.getKey()); - entry.setCounter(testEntry.getCounter()); - return entry; - } - private static KeyValue[] TOP10_MESSAGES = new KeyValue[] - { - KeyValue.pair( // 0 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 1 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 1l))), - KeyValue.pair( // 2 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l), - TestEntry.of("Welt", 1l))), - KeyValue.pair( // 3 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l))), - KeyValue.pair( // 4 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l), - TestEntry.of("s", 1l))), - KeyValue.pair( // 5 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l), - TestEntry.of("Welt", 1l), - TestEntry.of("Boäh", 1l))), - KeyValue.pair( // 6 - PETER, - TestRanking.of( - TestEntry.of("Welt", 2l), - TestEntry.of("Hallo", 1l), - TestEntry.of("Boäh", 1l))), - KeyValue.pair( // 7 - PETER, - TestRanking.of( - TestEntry.of("Welt", 2l), - TestEntry.of("Boäh", 2l), - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 8 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l), - TestEntry.of("s", 2l))), - KeyValue.pair( // 9 - PETER, - TestRanking.of( - TestEntry.of("Boäh", 3l), - TestEntry.of("Welt", 2l), - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 10 - KLAUS, - TestRanking.of( - TestEntry.of("s", 3l), - TestEntry.of("Müsch", 2l))), - }; - - private static KeyValue[] USERS_MESSAGES = new KeyValue[] - { - KeyValue.pair( - PETER.getChannel(), - TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)), - KeyValue.pair( - KLAUS.getChannel(), - TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), - }; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java new file mode 100644 index 0000000..fb12aee --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java @@ -0,0 +1,172 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.main.allow-bean-definition-overriding=true", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", + "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, + "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) +@Slf4j +public class QueryApplicationIT +{ + public static final String TOPIC_TOP10 = "top10"; + public static final String TOPIC_USERS = "users"; + + + @Autowired + MockMvc mockMvc; + @Autowired + ObjectMapper objectMapper; + @Autowired + QueryStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) + { + TestData + .getUsersMessages() + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + TestData + .getTop10Messages() + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + } + + private static void flush(CompletableFuture future) + { + try + { + SendResult result = future.get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @DisplayName("Await, that the expected state is visible in the state-store") + @Test + public void testAwaitExpectedStateInStore() + { + await("The expected state is visible in the state-store") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); + } + + @DisplayName("Await, that the expected state is queryable") + @Test + public void testAwaitExpectedStateIsQueryable() + { + await("The expected state is queryable") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); + } + + private UserRanking requestUserRankingFor(String user) + { + try + { + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java new file mode 100644 index 0000000..fbeb19b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java @@ -0,0 +1,91 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import de.juplo.kafka.wordcount.users.TestUserData; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; + +import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; + + +@Slf4j +public class QueryStreamProcessorTopologyTest +{ + public static final String TOP10_IN = "TOP10-IN"; + public static final String USERS_IN = "USERS-IN"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; + + + TopologyTestDriver testDriver; + TestInputTopic top10In; + TestInputTopic userIn; + + + @BeforeEach + public void setUp() + { + Topology topology = QueryStreamProcessor.buildTopology( + USERS_IN, + TOP10_IN, + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + top10In = testDriver.createInputTopic( + TOP10_IN, + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); + + userIn = testDriver.createInputTopic( + USERS_IN, + new StringSerializer(), + jsonSerializer(TestUserData.class, false).noTypeInfo()); + } + + + @Test + public void test() + { + TestData + .getUsersMessages() + .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); + TestData + .getTop10Messages() + .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); + + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); + TestData.assertExpectedState(user -> store.get(user)); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "stats:" + TestUser.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + isKey); + return jsonSerializer; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java new file mode 100644 index 0000000..44162a0 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java @@ -0,0 +1,146 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.TestEntry; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import de.juplo.kafka.wordcount.users.TestUserData; +import org.apache.kafka.streams.KeyValue; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE; +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final TestUser PETER = TestUser.of(STATS_TYPE, "peter"); + static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus"); + + static final Stream> getTop10Messages() + { + return Stream.of(TOP10_MESSAGES); + } + + static final Stream> getUsersMessages() + { + return Stream.of(USERS_MESSAGES); + } + + static void assertExpectedState(Function function) + { + assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel())); + } + + private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) + { + assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); + } + + private static UserRanking getLastMessageFor(String user) + { + return getTop10Messages() + .filter(kv -> kv.key.getChannel().equals(user)) + .map(kv -> kv.value) + .map(testRanking -> userRankingFor(user, testRanking)) + .reduce(null, (left, right) -> right); + } + + private static UserRanking userRankingFor(String user, TestRanking testRanking) + { + TestUserData testUserData = getUsersMessages() + .filter(kv -> kv.key.equals(user)) + .map(kv -> kv.value) + .reduce(null, (left, right) -> right); + + Entry[] entries = Arrays + .stream(testRanking.getEntries()) + .map(testEntry -> entryOf(testEntry)) + .toArray(size -> new Entry[size]); + + return UserRanking.of( + testUserData.getFirstName(), + testUserData.getLastName(), + entries); + } + + private static Entry entryOf(TestEntry testEntry) + { + Entry entry = new Entry(); + entry.setKey(testEntry.getKey()); + entry.setCounter(testEntry.getCounter()); + return entry; + } + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 1 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 2 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), + KeyValue.pair( // 3 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l))), + KeyValue.pair( // 4 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), + KeyValue.pair( // 5 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 6 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 7 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 8 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), + KeyValue.pair( // 9 + PETER, + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 10 + KLAUS, + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), + }; + + private static KeyValue[] USERS_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER.getChannel(), + TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)), + KeyValue.pair( + KLAUS.getChannel(), + TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), + }; +}