From: Kai Moritz Date: Sat, 15 Jun 2024 12:48:55 +0000 (+0200) Subject: WIP:popular: 1.0.0 - Renamed packages and classes -- MOVE X-Git-Tag: popular-on-query~3 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=599eaff785341e56da363b7706a0d6106fa13dc9;p=demos%2Fkafka%2Fwordcount WIP:popular: 1.0.0 - Renamed packages and classes -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java new file mode 100644 index 0000000..eeee7eb --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.query; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class QueryApplication +{ + public static void main(String[] args) + { + SpringApplication.run(QueryApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java new file mode 100644 index 0000000..440d5c4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java @@ -0,0 +1,141 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(QueryApplicationProperties.class) +@Slf4j +public class QueryApplicationConfiguration +{ + @Bean + public HostInfo applicationServer( + ServerProperties serverProperties, + QueryApplicationProperties applicationProperties) throws IOException + { + String host; + if (serverProperties.getAddress() == null) + { + HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer()); + Socket socket = new Socket(); + socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); + host = socket.getLocalAddress().getHostAddress(); + } + else + { + host = serverProperties.getAddress().getHostAddress(); + } + + Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); + + return new HostInfo(host, port); + } + + @Bean + public Properties streamProcessorProperties( + QueryApplicationProperties applicationProperties, + HostInfo applicationServer) + { + Properties props = new Properties(); + + props.putAll(serializationConfig()); + + String applicationId = applicationProperties.getApplicationId(); + String bootstrapServer = applicationProperties.getBootstrapServer(); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); + props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + static Properties serializationConfig() + { + Properties props = new Properties(); + + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userranking:" + UserRanking.class.getName()); + + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public QueryStreamProcessor streamProcessor( + Properties streamProcessorProperties, + HostInfo applicationServer, + QueryApplicationProperties applicationProperties, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, + ConfigurableApplicationContext context) + { + QueryStreamProcessor streamProcessor = new QueryStreamProcessor( + streamProcessorProperties, + applicationServer, + applicationProperties.getUsersInputTopic(), + applicationProperties.getRankingInputTopic(), + userStoreSupplier, + rankingStoreSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java new file mode 100644 index 0000000..4a9eeca --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.query; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.query") +@Getter +@Setter +@ToString +public class QueryApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "query"; + private String rankingInputTopic = "top10"; + private String usersInputTopic = "users"; + private Integer commitInterval; + private Integer cacheMaxBytes; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java new file mode 100644 index 0000000..a9b5b80 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularController.java @@ -0,0 +1,43 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; + +import java.net.URI; +import java.util.Optional; + + +@RestController +@RequiredArgsConstructor +public class QueryController +{ + private final QueryStreamProcessor processor; + + @GetMapping("{username}") + ResponseEntity queryFor(@PathVariable String username) + { + Optional redirect = processor.getRedirect(username); + if (redirect.isPresent()) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .location(redirect.get()) + .build(); + } + + try + { + return ResponseEntity.of(processor.getUserRanking(username)); + } + catch (InvalidStateStoreException e) + { + return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); + } + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java new file mode 100644 index 0000000..e075eb7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -0,0 +1,127 @@ +package de.juplo.kafka.wordcount.query; + +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.HostInfo; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.net.URI; +import java.util.Optional; +import java.util.Properties; + + +@Slf4j +public class QueryStreamProcessor +{ + public static final String USER_STORE_NAME = "users"; + public static final String RANKING_STORE_NAME = "rankings"; + + public final KafkaStreams streams; + public final HostInfo hostInfo; + public final StoreQueryParameters> storeParameters; + + + public QueryStreamProcessor( + Properties props, + HostInfo applicationServer, + String usersInputTopic, + String rankingInputTopic, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) + { + Topology topology = buildTopology( + usersInputTopic, + rankingInputTopic, + userStoreSupplier, + rankingStoreSupplier); + streams = new KafkaStreams(topology, props); + hostInfo = applicationServer; + storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; + } + + static Topology buildTopology( + String usersInputTopic, + String rankingInputTopic, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) + { + StreamsBuilder builder = new StreamsBuilder(); + + KTable users = builder + .stream( + usersInputTopic, + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) + .toTable( + Materialized + .as(userStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(User.class))); + KStream rankings = builder + .stream(rankingInputTopic) + .map((key, value) -> new KeyValue<>(key.getUser(), value)); + + rankings + .join(users, (ranking, user) -> UserRanking.of( + user.getFirstName(), + user.getLastName(), + ranking.getEntries()), + Joined.keySerde(Serdes.String())) + .toTable( + Materialized + .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; + } + + ReadOnlyKeyValueStore getStore() + { + return streams.store(storeParameters); + } + + public Optional getRedirect(String username) + { + KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); + HostInfo activeHost = metadata.activeHost(); + log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); + + if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable())) + { + return Optional.empty(); + } + + URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); + log.debug("Redirecting to {}", location); + return Optional.of(location); + } + + public Optional getUserRanking(String username) + { + return Optional.ofNullable(getStore().get(username)); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplication.java deleted file mode 100644 index eeee7eb..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class QueryApplication -{ - public static void main(String[] args) - { - SpringApplication.run(QueryApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationConfiguration.java deleted file mode 100644 index 440d5c4..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationConfiguration.java +++ /dev/null @@ -1,141 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.web.ServerProperties; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(QueryApplicationProperties.class) -@Slf4j -public class QueryApplicationConfiguration -{ - @Bean - public HostInfo applicationServer( - ServerProperties serverProperties, - QueryApplicationProperties applicationProperties) throws IOException - { - String host; - if (serverProperties.getAddress() == null) - { - HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer()); - Socket socket = new Socket(); - socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port())); - host = socket.getLocalAddress().getHostAddress(); - } - else - { - host = serverProperties.getAddress().getHostAddress(); - } - - Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort(); - - return new HostInfo(host, port); - } - - @Bean - public Properties streamProcessorProperties( - QueryApplicationProperties applicationProperties, - HostInfo applicationServer) - { - Properties props = new Properties(); - - props.putAll(serializationConfig()); - - String applicationId = applicationProperties.getApplicationId(); - String bootstrapServer = applicationProperties.getBootstrapServer(); - - props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); - - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); - if (applicationProperties.getCommitInterval() != null) - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); - if (applicationProperties.getCacheMaxBytes() != null) - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); - - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return props; - } - - static Properties serializationConfig() - { - Properties props = new Properties(); - - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put( - JsonDeserializer.TYPE_MAPPINGS, - "user:" + Key.class.getName() + "," + - "ranking:" + Ranking.class.getName() + "," + - "userranking:" + UserRanking.class.getName()); - - return props; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public QueryStreamProcessor streamProcessor( - Properties streamProcessorProperties, - HostInfo applicationServer, - QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier, - ConfigurableApplicationContext context) - { - QueryStreamProcessor streamProcessor = new QueryStreamProcessor( - streamProcessorProperties, - applicationServer, - applicationProperties.getUsersInputTopic(), - applicationProperties.getRankingInputTopic(), - userStoreSupplier, - rankingStoreSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier userStoreSupplier() - { - return Stores.persistentKeyValueStore(USER_STORE_NAME); - } - - @Bean - public KeyValueBytesStoreSupplier rankingStoreSupplier() - { - return Stores.persistentKeyValueStore(RANKING_STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationProperties.java deleted file mode 100644 index 4a9eeca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/popular/QueryApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.query; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.query") -@Getter -@Setter -@ToString -public class QueryApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "query"; - private String rankingInputTopic = "top10"; - private String usersInputTopic = "users"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryController.java deleted file mode 100644 index a9b5b80..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/popular/QueryController.java +++ /dev/null @@ -1,43 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.RequiredArgsConstructor; -import org.apache.kafka.streams.errors.InvalidStateStoreException; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.RestController; - -import java.net.URI; -import java.util.Optional; - - -@RestController -@RequiredArgsConstructor -public class QueryController -{ - private final QueryStreamProcessor processor; - - @GetMapping("{username}") - ResponseEntity queryFor(@PathVariable String username) - { - Optional redirect = processor.getRedirect(username); - if (redirect.isPresent()) - { - return - ResponseEntity - .status(HttpStatus.TEMPORARY_REDIRECT) - .location(redirect.get()) - .build(); - } - - try - { - return ResponseEntity.of(processor.getUserRanking(username)); - } - catch (InvalidStateStoreException e) - { - return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build(); - } - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessor.java deleted file mode 100644 index e075eb7..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessor.java +++ /dev/null @@ -1,127 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.*; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.net.URI; -import java.util.Optional; -import java.util.Properties; - - -@Slf4j -public class QueryStreamProcessor -{ - public static final String USER_STORE_NAME = "users"; - public static final String RANKING_STORE_NAME = "rankings"; - - public final KafkaStreams streams; - public final HostInfo hostInfo; - public final StoreQueryParameters> storeParameters; - - - public QueryStreamProcessor( - Properties props, - HostInfo applicationServer, - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) - { - Topology topology = buildTopology( - usersInputTopic, - rankingInputTopic, - userStoreSupplier, - rankingStoreSupplier); - streams = new KafkaStreams(topology, props); - hostInfo = applicationServer; - storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; - } - - static Topology buildTopology( - String usersInputTopic, - String rankingInputTopic, - KeyValueBytesStoreSupplier userStoreSupplier, - KeyValueBytesStoreSupplier rankingStoreSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - KTable users = builder - .stream( - usersInputTopic, - Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) - .toTable( - Materialized - .as(userStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder - .stream(rankingInputTopic) - .map((key, value) -> new KeyValue<>(key.getUser(), value)); - - rankings - .join(users, (ranking, user) -> UserRanking.of( - user.getFirstName(), - user.getLastName(), - ranking.getEntries()), - Joined.keySerde(Serdes.String())) - .toTable( - Materialized - .as(rankingStoreSupplier) - .withKeySerde(Serdes.String()) - .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(storeParameters); - } - - public Optional getRedirect(String username) - { - KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); - HostInfo activeHost = metadata.activeHost(); - log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); - - if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable())) - { - return Optional.empty(); - } - - URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username); - log.debug("Redirecting to {}", location); - return Optional.of(location); - } - - public Optional getUserRanking(String username) - { - return Optional.ofNullable(getStore().get(username)); - } - - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java new file mode 100644 index 0000000..1315eae --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -0,0 +1,172 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; + +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.main.allow-bean-definition-overriding=true", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", + "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, + "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) +@Slf4j +public class QueryApplicationIT +{ + public static final String TOPIC_TOP10 = "top10"; + public static final String TOPIC_USERS = "users"; + + + @Autowired + MockMvc mockMvc; + @Autowired + ObjectMapper objectMapper; + @Autowired + QueryStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) + { + TestData + .getUsersMessages() + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + TestData + .getTop10Messages() + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + } + + private static void flush(CompletableFuture future) + { + try + { + SendResult result = future.get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @DisplayName("Await, that the expected state is visible in the state-store") + @Test + public void testAwaitExpectedStateInStore() + { + await("The expected state is visible in the state-store") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); + } + + @DisplayName("Await, that the expected state is queryable") + @Test + public void testAwaitExpectedStateIsQueryable() + { + await("The expected state is queryable") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); + } + + private UserRanking requestUserRankingFor(String user) + { + try + { + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java new file mode 100644 index 0000000..203c813 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -0,0 +1,91 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; +import de.juplo.kafka.wordcount.users.TestUserData; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; + +import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; + + +@Slf4j +public class QueryStreamProcessorTopologyTest +{ + public static final String TOP10_IN = "TOP10-IN"; + public static final String USERS_IN = "USERS-IN"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; + + + TopologyTestDriver testDriver; + TestInputTopic top10In; + TestInputTopic userIn; + + + @BeforeEach + public void setUp() + { + Topology topology = QueryStreamProcessor.buildTopology( + USERS_IN, + TOP10_IN, + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + top10In = testDriver.createInputTopic( + TOP10_IN, + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); + + userIn = testDriver.createInputTopic( + USERS_IN, + new StringSerializer(), + jsonSerializer(TestUserData.class, false).noTypeInfo()); + } + + + @Test + public void test() + { + TestData + .getUsersMessages() + .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); + TestData + .getTop10Messages() + .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); + + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); + TestData.assertExpectedState(user -> store.get(user)); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "user:" + TestUser.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + isKey); + return jsonSerializer; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/QueryApplicationIT.java deleted file mode 100644 index 1315eae..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/popular/QueryApplicationIT.java +++ /dev/null @@ -1,172 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import com.fasterxml.jackson.databind.ObjectMapper; -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; - -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.util.Map; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.main.allow-bean-definition-overriding=true", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "logging.level.org.apache.kafka.clients=INFO", - "logging.level.org.apache.kafka.streams=INFO", - "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.query.commit-interval=100", - "juplo.wordcount.query.cache-max-bytes=0", - "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, - "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) -@Slf4j -public class QueryApplicationIT -{ - public static final String TOPIC_TOP10 = "top10"; - public static final String TOPIC_USERS = "users"; - - - @Autowired - MockMvc mockMvc; - @Autowired - ObjectMapper objectMapper; - @Autowired - QueryStreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate usersKafkaTemplate, - @Autowired KafkaTemplate top10KafkaTemplate) - { - TestData - .getUsersMessages() - .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); - TestData - .getTop10Messages() - .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); - } - - private static void flush(CompletableFuture future) - { - try - { - SendResult result = future.get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @DisplayName("Await, that the expected state is visible in the state-store") - @Test - public void testAwaitExpectedStateInStore() - { - await("The expected state is visible in the state-store") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); - } - - @DisplayName("Await, that the expected state is queryable") - @Test - public void testAwaitExpectedStateIsQueryable() - { - await("The expected state is queryable") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); - } - - private UserRanking requestUserRankingFor(String user) - { - try - { - return objectMapper.readValue( - mockMvc - .perform(MockMvcRequestBuilders.get("/{user}", user) - .contentType(MediaType.APPLICATION_JSON)) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(StandardCharsets.UTF_8), - UserRanking.class); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) - { - Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.ADD_TYPE_INFO_HEADERS, false); - return new KafkaTemplate(producerFactory, properties); - } - - @Bean - KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) - { - Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); - return new KafkaTemplate(producerFactory, properties); - } - - @Bean - KeyValueBytesStoreSupplier userStoreSupplier() - { - return Stores.inMemoryKeyValueStore(USER_STORE_NAME); - } - - @Bean - KeyValueBytesStoreSupplier rankingStoreSupplier() - { - return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessorTopologyTest.java deleted file mode 100644 index 203c813..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/popular/QueryStreamProcessorTopologyTest.java +++ /dev/null @@ -1,91 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.top10.TestUser; -import de.juplo.kafka.wordcount.users.TestUserData; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.Map; - -import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; - - -@Slf4j -public class QueryStreamProcessorTopologyTest -{ - public static final String TOP10_IN = "TOP10-IN"; - public static final String USERS_IN = "USERS-IN"; - public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; - public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; - - - TopologyTestDriver testDriver; - TestInputTopic top10In; - TestInputTopic userIn; - - - @BeforeEach - public void setUp() - { - Topology topology = QueryStreamProcessor.buildTopology( - USERS_IN, - TOP10_IN, - Stores.inMemoryKeyValueStore(USERS_STORE_NAME), - Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - top10In = testDriver.createInputTopic( - TOP10_IN, - jsonSerializer(TestUser.class, true), - jsonSerializer(TestRanking.class,false)); - - userIn = testDriver.createInputTopic( - USERS_IN, - new StringSerializer(), - jsonSerializer(TestUserData.class, false).noTypeInfo()); - } - - - @Test - public void test() - { - TestData - .getUsersMessages() - .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); - TestData - .getTop10Messages() - .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - - KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); - TestData.assertExpectedState(user -> store.get(user)); - } - - @AfterEach - public void tearDown() - { - testDriver.close(); - } - - private JsonSerializer jsonSerializer(Class type, boolean isKey) - { - JsonSerializer jsonSerializer = new JsonSerializer<>(); - jsonSerializer.configure( - Map.of( - JsonSerializer.TYPE_MAPPINGS, - "user:" + TestUser.class.getName() + "," + - "ranking:" + TestRanking.class.getName()), - isKey); - return jsonSerializer; - } -}