From: Kai Moritz Date: Sat, 15 Jun 2024 12:42:52 +0000 (+0200) Subject: popular: 1.0.0 - Renamed packages and classes -- MOVE X-Git-Tag: popular-on-query~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=9a554f5f9bdc5750f1b47088eb3b3038527c89dc;p=demos%2Fkafka%2Fwordcount popular: 1.0.0 - Renamed packages and classes -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplication.java new file mode 100644 index 0000000..eeee7eb --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplication.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.query; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class QueryApplication +{ + public static void main(String[] args) + { + SpringApplication.run(QueryApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationConfiguration.java new file mode 100644 index 0000000..440d5c4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationConfiguration.java @@ -0,0 +1,141 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(QueryApplicationProperties.class) +@Slf4j +public class QueryApplicationConfiguration +{ + @Bean + public HostInfo applicationServer( + ServerProperties serverProperties, + QueryApplicationProperties applicationProperties) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new HostInfo(host, port); + } + + @Bean + public Properties streamProcessorProperties( + QueryApplicationProperties applicationProperties, + HostInfo applicationServer) + { + Properties props = new Properties(); + + props.putAll(serializationConfig()); + + String applicationId = applicationProperties.getApplicationId(); + String bootstrapServer = applicationProperties.getBootstrapServer(); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + static Properties serializationConfig() + { + Properties props = new Properties(); + + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userranking:" + UserRanking.class.getName()); + + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public QueryStreamProcessor streamProcessor( + Properties streamProcessorProperties, + HostInfo applicationServer, + QueryApplicationProperties applicationProperties, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, + ConfigurableApplicationContext context) + { + QueryStreamProcessor streamProcessor = new QueryStreamProcessor( + streamProcessorProperties, + applicationServer, + applicationProperties.getUsersInputTopic(), + applicationProperties.getRankingInputTopic(), + userStoreSupplier, + rankingStoreSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationProperties.java new file mode 100644 index 0000000..4a9eeca --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationProperties.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.query; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.query") +@Getter +@Setter +@ToString +public class QueryApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "query"; + private String rankingInputTopic = "top10"; + private String usersInputTopic = "users"; + private Integer commitInterval; + private Integer cacheMaxBytes; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryController.java new file mode 100644 index 0000000..a9b5b80 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/QueryController.java @@ -0,0 +1,43 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URI; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +public class QueryController +{ + private final QueryStreamProcessor processor; + + @GetMapping("{username}") + ResponseEntity queryFor(@PathVariable String username) + { + Optional redirect = processor.getRedirect(username); + if (redirect.isPresent()) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(redirect.get()) + .build(); + } + + try + { + return ResponseEntity.of(processor.getUserRanking(username)); + } + catch (InvalidStateStoreException e) + { + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessor.java new file mode 100644 index 0000000..e075eb7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessor.java @@ -0,0 +1,127 @@ +package de.juplo.kafka.wordcount.query; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.net.URI; +import java.util.Optional; +import java.util.Properties; + + +@Slf4j +public class QueryStreamProcessor +{ + public static final String USER_STORE_NAME = "users"; + public static final String RANKING_STORE_NAME = "rankings"; + + public final KafkaStreams streams; + public final HostInfo hostInfo; + public final StoreQueryParameters> storeParameters; + + + public QueryStreamProcessor( + Properties props, + HostInfo applicationServer, + String usersInputTopic, + String rankingInputTopic, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) + { + Topology topology = buildTopology( + usersInputTopic, + rankingInputTopic, + userStoreSupplier, + rankingStoreSupplier); + streams = new KafkaStreams(topology, props); + hostInfo = applicationServer; + storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; + } + + static Topology buildTopology( + String usersInputTopic, + String rankingInputTopic, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) + { + StreamsBuilder builder = new StreamsBuilder(); + + KTable users = builder + .stream( + usersInputTopic, + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) + .toTable( + Materialized + .as(userStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(User.class))); + KStream rankings = builder + .stream(rankingInputTopic) + .map((key, value) -> new KeyValue<>(key.getUser(), value)); + + rankings + .join(users, (ranking, user) -> UserRanking.of( + user.getFirstName(), + user.getLastName(), + ranking.getEntries()), + Joined.keySerde(Serdes.String())) + .toTable( + Materialized + .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; + } + + ReadOnlyKeyValueStore getStore() + { + return streams.store(storeParameters); + } + + public Optional getRedirect(String username) + { + KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); + HostInfo activeHost = metadata.activeHost(); + log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); + + if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable())) + { + return Optional.empty(); + } + + URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); + log.debug("Redirecting to {}", location); + return Optional.of(location); + } + + public Optional getUserRanking(String username) + { + return Optional.ofNullable(getStore().get(username)); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java new file mode 100644 index 0000000..8966be6 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java @@ -0,0 +1,10 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; + + +@Data +public class Ranking +{ + private Entry[] entries; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java new file mode 100644 index 0000000..57d095a --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java @@ -0,0 +1,10 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; + + +@Data +public class Key +{ + private String user; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java new file mode 100644 index 0000000..80b4daf --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.Data; + + +@Data +public class Entry +{ + private String word; + private Long count; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java deleted file mode 100644 index 80b4daf..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; - - -@Data -public class Entry -{ - private String word; - private Long count; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java deleted file mode 100644 index 57d095a..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; - - -@Data -public class Key -{ - private String user; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java deleted file mode 100644 index eeee7eb..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class QueryApplication -{ - public static void main(String[] args) - { - SpringApplication.run(QueryApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java deleted file mode 100644 index 440d5c4..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ /dev/null @@ -1,141 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.web.ServerProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(QueryApplicationProperties.class) -@Slf4j -public class QueryApplicationConfiguration -{ - @Bean - public HostInfo applicationServer( - ServerProperties serverProperties, - QueryApplicationProperties applicationProperties) throws IOException - { - String host; - if (serverProperties.getAddress() == null) - { - HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer()); - Socket socket = new Socket(); - socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); - host = socket.getLocalAddress().getHostAddress(); - } - else - { - host = serverProperties.getAddress().getHostAddress(); - } - - Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); - - return new HostInfo(host, port); - } - - @Bean - public Properties streamProcessorProperties( - QueryApplicationProperties applicationProperties, - HostInfo applicationServer) - { - Properties props = new Properties(); - - props.putAll(serializationConfig()); - - String applicationId = applicationProperties.getApplicationId(); - String bootstrapServer = applicationProperties.getBootstrapServer(); - - props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); - if (applicationProperties.getCommitInterval() != null) - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); - if (applicationProperties.getCacheMaxBytes() != null) - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); - - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return props; - } - - static Properties serializationConfig() - { - Properties props = new Properties(); - - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put( - JsonDeserializer.TYPE_MAPPINGS, - "user:" + Key.class.getName() + "," + - "ranking:" + Ranking.class.getName() + "," + - "userranking:" + UserRanking.class.getName()); - - return props; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public QueryStreamProcessor streamProcessor( - Properties streamProcessorProperties, - HostInfo applicationServer, - QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier, - ConfigurableApplicationContext context) - { - QueryStreamProcessor streamProcessor = new QueryStreamProcessor( - streamProcessorProperties, - applicationServer, - applicationProperties.getUsersInputTopic(), - applicationProperties.getRankingInputTopic(), - userStoreSupplier, - rankingStoreSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier userStoreSupplier() - { - return Stores.persistentKeyValueStore(USER_STORE_NAME); - } - - @Bean - public KeyValueBytesStoreSupplier rankingStoreSupplier() - { - return Stores.persistentKeyValueStore(RANKING_STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java deleted file mode 100644 index 4a9eeca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.query; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.query") -@Getter -@Setter -@ToString -public class QueryApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "query"; - private String rankingInputTopic = "top10"; - private String usersInputTopic = "users"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java deleted file mode 100644 index a9b5b80..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; - -import java.net.URI; -import java.util.Optional; - - -@RestController -@RequiredArgsConstructor -public class QueryController -{ - private final QueryStreamProcessor processor; - - @GetMapping("{username}") - ResponseEntity queryFor(@PathVariable String username) - { - Optional redirect = processor.getRedirect(username); - if (redirect.isPresent()) - { - return - ResponseEntity - .status(HttpStatus.TEMPORARY_REDIRECT) - .location(redirect.get()) - .build(); - } - - try - { - return ResponseEntity.of(processor.getUserRanking(username)); - } - catch (InvalidStateStoreException e) - { - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java deleted file mode 100644 index e075eb7..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ /dev/null @@ -1,127 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.net.URI; -import java.util.Optional; -import java.util.Properties; - - -@Slf4j -public class QueryStreamProcessor -{ - public static final String USER_STORE_NAME = "users"; - public static final String RANKING_STORE_NAME = "rankings"; - - public final KafkaStreams streams; - public final HostInfo hostInfo; - public final StoreQueryParameters> storeParameters; - - - public QueryStreamProcessor( - Properties props, - HostInfo applicationServer, - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) - { - Topology topology = buildTopology( - usersInputTopic, - rankingInputTopic, - userStoreSupplier, - rankingStoreSupplier); - streams = new KafkaStreams(topology, props); - hostInfo = applicationServer; - storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; - } - - static Topology buildTopology( - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - KTable users = builder - .stream( - usersInputTopic, - Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) - .toTable( - Materialized - .as(userStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder - .stream(rankingInputTopic) - .map((key, value) -> new KeyValue<>(key.getUser(), value)); - - rankings - .join(users, (ranking, user) -> UserRanking.of( - user.getFirstName(), - user.getLastName(), - ranking.getEntries()), - Joined.keySerde(Serdes.String())) - .toTable( - Materialized - .as(rankingStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(storeParameters); - } - - public Optional getRedirect(String username) - { - KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); - HostInfo activeHost = metadata.activeHost(); - log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); - - if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable())) - { - return Optional.empty(); - } - - URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); - log.debug("Redirecting to {}", location); - return Optional.of(location); - } - - public Optional getUserRanking(String username) - { - return Optional.ofNullable(getStore().get(username)); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java deleted file mode 100644 index 8966be6..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java +++ /dev/null @@ -1,10 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; - - -@Data -public class Ranking -{ - private Entry[] entries; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java deleted file mode 100644 index f62b475..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/User.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.Data; -import lombok.EqualsAndHashCode; - - -@Data -@EqualsAndHashCode(of = "username") -public class User -{ - public enum Sex { FEMALE, MALE, OTHER } - - private String username; - - private String firstName; - private String lastName; - private Sex sex; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java deleted file mode 100644 index 9ca765a..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.*; - - -@Data -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -public class UserRanking -{ - private String firstName; - private String lastName; - private Entry[] top10; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWordCounter.java new file mode 100644 index 0000000..215327f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWordCounter.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String word; + long count; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/QueryApplicationIT.java new file mode 100644 index 0000000..1315eae --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/QueryApplicationIT.java @@ -0,0 +1,172 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.main.allow-bean-definition-overriding=true", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", + "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, + "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) +@Slf4j +public class QueryApplicationIT +{ + public static final String TOPIC_TOP10 = "top10"; + public static final String TOPIC_USERS = "users"; + + + @Autowired + MockMvc mockMvc; + @Autowired + ObjectMapper objectMapper; + @Autowired + QueryStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) + { + TestData + .getUsersMessages() + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + TestData + .getTop10Messages() + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + } + + private static void flush(CompletableFuture future) + { + try + { + SendResult result = future.get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @DisplayName("Await, that the expected state is visible in the state-store") + @Test + public void testAwaitExpectedStateInStore() + { + await("The expected state is visible in the state-store") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); + } + + @DisplayName("Await, that the expected state is queryable") + @Test + public void testAwaitExpectedStateIsQueryable() + { + await("The expected state is queryable") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); + } + + private UserRanking requestUserRankingFor(String user) + { + try + { + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessorTopologyTest.java new file mode 100644 index 0000000..203c813 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessorTopologyTest.java @@ -0,0 +1,91 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import de.juplo.kafka.wordcount.users.TestUserData; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; + +import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; + + +@Slf4j +public class QueryStreamProcessorTopologyTest +{ + public static final String TOP10_IN = "TOP10-IN"; + public static final String USERS_IN = "USERS-IN"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; + + + TopologyTestDriver testDriver; + TestInputTopic top10In; + TestInputTopic userIn; + + + @BeforeEach + public void setUp() + { + Topology topology = QueryStreamProcessor.buildTopology( + USERS_IN, + TOP10_IN, + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + top10In = testDriver.createInputTopic( + TOP10_IN, + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); + + userIn = testDriver.createInputTopic( + USERS_IN, + new StringSerializer(), + jsonSerializer(TestUserData.class, false).noTypeInfo()); + } + + + @Test + public void test() + { + TestData + .getUsersMessages() + .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); + TestData + .getTop10Messages() + .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); + + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); + TestData.assertExpectedState(user -> store.get(user)); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "user:" + TestUser.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + isKey); + return jsonSerializer; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java new file mode 100644 index 0000000..7c8b0b4 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -0,0 +1,145 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.TestEntry; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import de.juplo.kafka.wordcount.users.TestUserData; +import org.apache.kafka.streams.KeyValue; + +import java.util.Arrays; +import java.util.function.Function; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); + + static final Stream> getTop10Messages() + { + return Stream.of(TOP10_MESSAGES); + } + + static final Stream> getUsersMessages() + { + return Stream.of(USERS_MESSAGES); + } + + static void assertExpectedState(Function function) + { + assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); + } + + private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) + { + assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); + } + + private static UserRanking getLastMessageFor(String user) + { + return getTop10Messages() + .filter(kv -> kv.key.getUser().equals(user)) + .map(kv -> kv.value) + .map(testRanking -> userRankingFor(user, testRanking)) + .reduce(null, (left, right) -> right); + } + + private static UserRanking userRankingFor(String user, TestRanking testRanking) + { + TestUserData testUserData = getUsersMessages() + .filter(kv -> kv.key.equals(user)) + .map(kv -> kv.value) + .reduce(null, (left, right) -> right); + + Entry[] entries = Arrays + .stream(testRanking.getEntries()) + .map(testEntry -> entryOf(testEntry)) + .toArray(size -> new Entry[size]); + + return UserRanking.of( + testUserData.getFirstName(), + testUserData.getLastName(), + entries); + } + + private static Entry entryOf(TestEntry testEntry) + { + Entry entry = new Entry(); + entry.setWord(testEntry.getWord()); + entry.setCount(testEntry.getCount()); + return entry; + } + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 1 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 2 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), + KeyValue.pair( // 3 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l))), + KeyValue.pair( // 4 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), + KeyValue.pair( // 5 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 6 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 7 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 8 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), + KeyValue.pair( // 9 + PETER, + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 10 + KLAUS, + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), + }; + + private static KeyValue[] USERS_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER.getUser(), + TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)), + KeyValue.pair( + KLAUS.getUser(), + TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), + }; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java deleted file mode 100644 index 1315eae..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ /dev/null @@ -1,172 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; - -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.main.allow-bean-definition-overriding=true", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "logging.level.org.apache.kafka.clients=INFO", - "logging.level.org.apache.kafka.streams=INFO", - "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.query.commit-interval=100", - "juplo.wordcount.query.cache-max-bytes=0", - "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, - "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) -@Slf4j -public class QueryApplicationIT -{ - public static final String TOPIC_TOP10 = "top10"; - public static final String TOPIC_USERS = "users"; - - - @Autowired - MockMvc mockMvc; - @Autowired - ObjectMapper objectMapper; - @Autowired - QueryStreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate usersKafkaTemplate, - @Autowired KafkaTemplate top10KafkaTemplate) - { - TestData - .getUsersMessages() - .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); - TestData - .getTop10Messages() - .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); - } - - private static void flush(CompletableFuture future) - { - try - { - SendResult result = future.get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @DisplayName("Await, that the expected state is visible in the state-store") - @Test - public void testAwaitExpectedStateInStore() - { - await("The expected state is visible in the state-store") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); - } - - @DisplayName("Await, that the expected state is queryable") - @Test - public void testAwaitExpectedStateIsQueryable() - { - await("The expected state is queryable") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); - } - - private UserRanking requestUserRankingFor(String user) - { - try - { - return objectMapper.readValue( - mockMvc - .perform(MockMvcRequestBuilders.get("/{user}", user) - .contentType(MediaType.APPLICATION_JSON)) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(StandardCharsets.UTF_8), - UserRanking.class); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) - { - Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.ADD_TYPE_INFO_HEADERS, false); - return new KafkaTemplate(producerFactory, properties); - } - - @Bean - KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) - { - Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); - return new KafkaTemplate(producerFactory, properties); - } - - @Bean - KeyValueBytesStoreSupplier userStoreSupplier() - { - return Stores.inMemoryKeyValueStore(USER_STORE_NAME); - } - - @Bean - KeyValueBytesStoreSupplier rankingStoreSupplier() - { - return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java deleted file mode 100644 index 203c813..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ /dev/null @@ -1,91 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import de.juplo.kafka.wordcount.users.TestUserData; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.Map; - -import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; - - -@Slf4j -public class QueryStreamProcessorTopologyTest -{ - public static final String TOP10_IN = "TOP10-IN"; - public static final String USERS_IN = "USERS-IN"; - public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; - public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; - - - TopologyTestDriver testDriver; - TestInputTopic top10In; - TestInputTopic userIn; - - - @BeforeEach - public void setUp() - { - Topology topology = QueryStreamProcessor.buildTopology( - USERS_IN, - TOP10_IN, - Stores.inMemoryKeyValueStore(USERS_STORE_NAME), - Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - top10In = testDriver.createInputTopic( - TOP10_IN, - jsonSerializer(TestUser.class, true), - jsonSerializer(TestRanking.class,false)); - - userIn = testDriver.createInputTopic( - USERS_IN, - new StringSerializer(), - jsonSerializer(TestUserData.class, false).noTypeInfo()); - } - - - @Test - public void test() - { - TestData - .getUsersMessages() - .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); - TestData - .getTop10Messages() - .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - - KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); - TestData.assertExpectedState(user -> store.get(user)); - } - - @AfterEach - public void tearDown() - { - testDriver.close(); - } - - private JsonSerializer jsonSerializer(Class type, boolean isKey) - { - JsonSerializer jsonSerializer = new JsonSerializer<>(); - jsonSerializer.configure( - Map.of( - JsonSerializer.TYPE_MAPPINGS, - "user:" + TestUser.class.getName() + "," + - "ranking:" + TestRanking.class.getName()), - isKey); - return jsonSerializer; - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java deleted file mode 100644 index 7c8b0b4..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ /dev/null @@ -1,145 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import de.juplo.kafka.wordcount.top10.TestEntry; -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import de.juplo.kafka.wordcount.users.TestUserData; -import org.apache.kafka.streams.KeyValue; - -import java.util.Arrays; -import java.util.function.Function; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - - -class TestData -{ - static final TestUser PETER = TestUser.of("peter"); - static final TestUser KLAUS = TestUser.of("klaus"); - - static final Stream> getTop10Messages() - { - return Stream.of(TOP10_MESSAGES); - } - - static final Stream> getUsersMessages() - { - return Stream.of(USERS_MESSAGES); - } - - static void assertExpectedState(Function function) - { - assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); - assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); - } - - private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) - { - assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); - } - - private static UserRanking getLastMessageFor(String user) - { - return getTop10Messages() - .filter(kv -> kv.key.getUser().equals(user)) - .map(kv -> kv.value) - .map(testRanking -> userRankingFor(user, testRanking)) - .reduce(null, (left, right) -> right); - } - - private static UserRanking userRankingFor(String user, TestRanking testRanking) - { - TestUserData testUserData = getUsersMessages() - .filter(kv -> kv.key.equals(user)) - .map(kv -> kv.value) - .reduce(null, (left, right) -> right); - - Entry[] entries = Arrays - .stream(testRanking.getEntries()) - .map(testEntry -> entryOf(testEntry)) - .toArray(size -> new Entry[size]); - - return UserRanking.of( - testUserData.getFirstName(), - testUserData.getLastName(), - entries); - } - - private static Entry entryOf(TestEntry testEntry) - { - Entry entry = new Entry(); - entry.setWord(testEntry.getWord()); - entry.setCount(testEntry.getCount()); - return entry; - } - private static KeyValue[] TOP10_MESSAGES = new KeyValue[] - { - KeyValue.pair( // 0 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 1 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 1l))), - KeyValue.pair( // 2 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l), - TestEntry.of("Welt", 1l))), - KeyValue.pair( // 3 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l))), - KeyValue.pair( // 4 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l), - TestEntry.of("s", 1l))), - KeyValue.pair( // 5 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l), - TestEntry.of("Welt", 1l), - TestEntry.of("Boäh", 1l))), - KeyValue.pair( // 6 - PETER, - TestRanking.of( - TestEntry.of("Welt", 2l), - TestEntry.of("Hallo", 1l), - TestEntry.of("Boäh", 1l))), - KeyValue.pair( // 7 - PETER, - TestRanking.of( - TestEntry.of("Welt", 2l), - TestEntry.of("Boäh", 2l), - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 8 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l), - TestEntry.of("s", 2l))), - KeyValue.pair( // 9 - PETER, - TestRanking.of( - TestEntry.of("Boäh", 3l), - TestEntry.of("Welt", 2l), - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 10 - KLAUS, - TestRanking.of( - TestEntry.of("s", 3l), - TestEntry.of("Müsch", 2l))), - }; - - private static KeyValue[] USERS_MESSAGES = new KeyValue[] - { - KeyValue.pair( - PETER.getUser(), - TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)), - KeyValue.pair( - KLAUS.getUser(), - TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), - }; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java deleted file mode 100644 index 215327f..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -@Data -public class TestEntry -{ - String word; - long count; -}