stats: 1.0.0 - Renamed the project into `stats` -- MOVE
authorKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 04:03:10 +0000 (06:03 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 04:17:56 +0000 (06:17 +0200)
26 files changed:
src/main/java/de/juplo/kafka/wordcount/query/Entry.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/Key.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryController.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/Ranking.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/User.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/stats/Entry.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/Key.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/User.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/query/TestData.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/stats/TestData.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java
deleted file mode 100644 (file)
index 383b1a6..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-
-
-@Data
-public class Entry
-{
-  private String key;
-  private Long counter;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java
deleted file mode 100644 (file)
index a2d85a1..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-
-
-@Data
-public class Key
-{
-  private String type;
-  private String channel;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplication.java
deleted file mode 100644 (file)
index eeee7eb..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class QueryApplication
-{
-       public static void main(String[] args)
-       {
-               SpringApplication.run(QueryApplication.class, args);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
deleted file mode 100644 (file)
index 0f9cad1..0000000
+++ /dev/null
@@ -1,141 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.web.ServerProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(QueryApplicationProperties.class)
-@Slf4j
-public class QueryApplicationConfiguration
-{
-       @Bean
-       public HostInfo applicationServer(
-                       ServerProperties serverProperties,
-                       QueryApplicationProperties applicationProperties) throws IOException
-       {
-               String host;
-               if (serverProperties.getAddress() == null)
-               {
-                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
-                       Socket socket = new Socket();
-                       socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
-                       host = socket.getLocalAddress().getHostAddress();
-               }
-               else
-               {
-                       host = serverProperties.getAddress().getHostAddress();
-               }
-
-               Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
-
-               return new HostInfo(host, port);
-       }
-
-       @Bean
-       public Properties streamProcessorProperties(
-                       QueryApplicationProperties applicationProperties,
-                       HostInfo applicationServer)
-       {
-               Properties props = new Properties();
-
-               props.putAll(serializationConfig());
-
-               String applicationId = applicationProperties.getApplicationId();
-               String bootstrapServer = applicationProperties.getBootstrapServer();
-
-               props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
-               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
-
-               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
-               if (applicationProperties.getCommitInterval() != null)
-                       props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval());
-               if (applicationProperties.getCacheMaxBytes() != null)
-                       props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes());
-
-               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-               return props;
-       }
-
-       static Properties serializationConfig()
-       {
-               Properties props = new Properties();
-
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(
-                               JsonDeserializer.TYPE_MAPPINGS,
-                               "stats:" + Key.class.getName() + "," +
-                               "ranking:" + Ranking.class.getName() + "," +
-                               "userranking:" + UserRanking.class.getName());
-
-               return props;
-       }
-
-       @Bean(initMethod = "start", destroyMethod = "stop")
-       public QueryStreamProcessor streamProcessor(
-                       Properties streamProcessorProperties,
-                       HostInfo applicationServer,
-                       QueryApplicationProperties applicationProperties,
-                       KeyValueBytesStoreSupplier userStoreSupplier,
-                       KeyValueBytesStoreSupplier rankingStoreSupplier,
-                       ConfigurableApplicationContext context)
-       {
-               QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
-                               streamProcessorProperties,
-                               applicationServer,
-                               applicationProperties.getUsersInputTopic(),
-                               applicationProperties.getRankingInputTopic(),
-                               userStoreSupplier,
-                               rankingStoreSupplier);
-
-               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
-               {
-                       log.error("Unexpected error!", e);
-                       CompletableFuture.runAsync(() ->
-                       {
-                               log.info("Stopping application...");
-                               SpringApplication.exit(context, () -> 1);
-                       });
-                       return SHUTDOWN_CLIENT;
-               });
-
-               return streamProcessor;
-       }
-
-       @Bean
-       public KeyValueBytesStoreSupplier userStoreSupplier()
-       {
-               return Stores.persistentKeyValueStore(USER_STORE_NAME);
-       }
-
-       @Bean
-       public KeyValueBytesStoreSupplier rankingStoreSupplier()
-       {
-               return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java
deleted file mode 100644 (file)
index 4a9eeca..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.query")
-@Getter
-@Setter
-@ToString
-public class QueryApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String applicationId = "query";
-  private String rankingInputTopic = "top10";
-  private String usersInputTopic = "users";
-  private Integer commitInterval;
-  private Integer cacheMaxBytes;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryController.java
deleted file mode 100644 (file)
index a9b5b80..0000000
+++ /dev/null
@@ -1,43 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-
-import java.net.URI;
-import java.util.Optional;
-
-
-@RestController
-@RequiredArgsConstructor
-public class QueryController
-{
-  private final QueryStreamProcessor processor;
-
-  @GetMapping("{username}")
-  ResponseEntity<UserRanking> queryFor(@PathVariable String username)
-  {
-    Optional<URI> redirect = processor.getRedirect(username);
-    if (redirect.isPresent())
-    {
-      return
-          ResponseEntity
-              .status(HttpStatus.TEMPORARY_REDIRECT)
-              .location(redirect.get())
-              .build();
-    }
-
-    try
-    {
-      return ResponseEntity.of(processor.getUserRanking(username));
-    }
-    catch (InvalidStateStoreException e)
-    {
-      return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
-    }
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
deleted file mode 100644 (file)
index 5543a91..0000000
+++ /dev/null
@@ -1,129 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import jakarta.annotation.PostConstruct;
-import jakarta.annotation.PreDestroy;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.*;
-import org.apache.kafka.streams.state.HostInfo;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.net.URI;
-import java.util.Optional;
-import java.util.Properties;
-
-
-@Slf4j
-public class QueryStreamProcessor
-{
-       public static final String STATS_TYPE = "COUNTER";
-       public static final String USER_STORE_NAME = "users";
-       public static final String RANKING_STORE_NAME = "rankings";
-
-       public final KafkaStreams streams;
-       public final HostInfo hostInfo;
-       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
-
-
-       public QueryStreamProcessor(
-                       Properties props,
-                       HostInfo applicationServer,
-                       String usersInputTopic,
-                       String rankingInputTopic,
-                       KeyValueBytesStoreSupplier userStoreSupplier,
-                       KeyValueBytesStoreSupplier rankingStoreSupplier)
-       {
-               Topology topology = buildTopology(
-                               usersInputTopic,
-                               rankingInputTopic,
-                               userStoreSupplier,
-                               rankingStoreSupplier);
-               streams = new KafkaStreams(topology, props);
-               hostInfo = applicationServer;
-               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
-       }
-
-       static Topology buildTopology(
-                       String usersInputTopic,
-                       String rankingInputTopic,
-                       KeyValueBytesStoreSupplier userStoreSupplier,
-                       KeyValueBytesStoreSupplier rankingStoreSupplier)
-       {
-               StreamsBuilder builder = new StreamsBuilder();
-
-               KTable<String, User> users = builder
-                               .stream(
-                                               usersInputTopic,
-                                               Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
-                               .toTable(
-                                               Materialized
-                                                               .<String, User>as(userStoreSupplier)
-                                                               .withKeySerde(Serdes.String())
-                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
-               KStream<String, Ranking> rankings = builder
-                               .<Key, Ranking>stream(rankingInputTopic)
-                               .filter((key, value) -> STATS_TYPE.equals(key.getType()))
-                               .map((key, value) -> new KeyValue<>(key.getChannel(), value));
-
-               rankings
-                               .join(users, (ranking, user) -> UserRanking.of(
-                                               user.getFirstName(),
-                                               user.getLastName(),
-                                               ranking.getEntries()),
-                                               Joined.keySerde(Serdes.String()))
-                               .toTable(
-                                               Materialized
-                                                               .<String, UserRanking>as(rankingStoreSupplier)
-                                                               .withKeySerde(Serdes.String())
-                                                               .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
-
-               Topology topology = builder.build();
-               log.info("\n\n{}", topology.describe());
-
-               return topology;
-       }
-
-       ReadOnlyKeyValueStore<String, UserRanking> getStore()
-       {
-               return streams.store(storeParameters);
-       }
-
-       public Optional<URI> getRedirect(String username)
-       {
-               KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
-               HostInfo activeHost = metadata.activeHost();
-               log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
-
-               if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
-               {
-                       return Optional.empty();
-               }
-
-               URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
-               log.debug("Redirecting to {}", location);
-               return Optional.of(location);
-       }
-
-       public Optional<UserRanking> getUserRanking(String username)
-       {
-               return Optional.ofNullable(getStore().get(username));
-       }
-
-       @PostConstruct
-       public void start()
-       {
-               log.info("Starting Stream-Processor");
-               streams.start();
-       }
-
-       @PreDestroy
-       public void stop()
-       {
-               log.info("Stopping Stream-Processor");
-               streams.close();
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java
deleted file mode 100644 (file)
index 8966be6..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-
-
-@Data
-public class Ranking
-{
-  private Entry[] entries;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java
deleted file mode 100644 (file)
index f62b475..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-
-@Data
-@EqualsAndHashCode(of = "username")
-public class User
-{
-  public enum Sex { FEMALE, MALE, OTHER }
-
-  private String username;
-
-  private String firstName;
-  private String lastName;
-  private Sex sex;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java
deleted file mode 100644 (file)
index 9ca765a..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import lombok.*;
-
-
-@Data
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-public class UserRanking
-{
-  private String firstName;
-  private String lastName;
-  private Entry[] top10;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java
new file mode 100644 (file)
index 0000000..383b1a6
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+
+
+@Data
+public class Entry
+{
+  private String key;
+  private Long counter;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java
new file mode 100644 (file)
index 0000000..a2d85a1
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+
+
+@Data
+public class Key
+{
+  private String type;
+  private String channel;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java
new file mode 100644 (file)
index 0000000..8966be6
--- /dev/null
@@ -0,0 +1,10 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+
+
+@Data
+public class Ranking
+{
+  private Entry[] entries;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java
new file mode 100644 (file)
index 0000000..eeee7eb
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.query;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class QueryApplication
+{
+       public static void main(String[] args)
+       {
+               SpringApplication.run(QueryApplication.class, args);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java
new file mode 100644 (file)
index 0000000..0f9cad1
--- /dev/null
@@ -0,0 +1,141 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(QueryApplicationProperties.class)
+@Slf4j
+public class QueryApplicationConfiguration
+{
+       @Bean
+       public HostInfo applicationServer(
+                       ServerProperties serverProperties,
+                       QueryApplicationProperties applicationProperties) throws IOException
+       {
+               String host;
+               if (serverProperties.getAddress() == null)
+               {
+                       HostInfo bootstrapServer = HostInfo.buildFromEndpoint(applicationProperties.getBootstrapServer());
+                       Socket socket = new Socket();
+                       socket.connect(new InetSocketAddress(bootstrapServer.host(), bootstrapServer.port()));
+                       host = socket.getLocalAddress().getHostAddress();
+               }
+               else
+               {
+                       host = serverProperties.getAddress().getHostAddress();
+               }
+
+               Integer port = serverProperties.getPort() == null ? 8080 : serverProperties.getPort();
+
+               return new HostInfo(host, port);
+       }
+
+       @Bean
+       public Properties streamProcessorProperties(
+                       QueryApplicationProperties applicationProperties,
+                       HostInfo applicationServer)
+       {
+               Properties props = new Properties();
+
+               props.putAll(serializationConfig());
+
+               String applicationId = applicationProperties.getApplicationId();
+               String bootstrapServer = applicationProperties.getBootstrapServer();
+
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+               props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
+               if (applicationProperties.getCommitInterval() != null)
+                       props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval());
+               if (applicationProperties.getCacheMaxBytes() != null)
+                       props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes());
+
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return props;
+       }
+
+       static Properties serializationConfig()
+       {
+               Properties props = new Properties();
+
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "stats:" + Key.class.getName() + "," +
+                               "ranking:" + Ranking.class.getName() + "," +
+                               "userranking:" + UserRanking.class.getName());
+
+               return props;
+       }
+
+       @Bean(initMethod = "start", destroyMethod = "stop")
+       public QueryStreamProcessor streamProcessor(
+                       Properties streamProcessorProperties,
+                       HostInfo applicationServer,
+                       QueryApplicationProperties applicationProperties,
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier,
+                       ConfigurableApplicationContext context)
+       {
+               QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+                               streamProcessorProperties,
+                               applicationServer,
+                               applicationProperties.getUsersInputTopic(),
+                               applicationProperties.getRankingInputTopic(),
+                               userStoreSupplier,
+                               rankingStoreSupplier);
+
+               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+               return streamProcessor;
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier userStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(USER_STORE_NAME);
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier rankingStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(RANKING_STORE_NAME);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java
new file mode 100644 (file)
index 0000000..4a9eeca
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.query;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.query")
+@Getter
+@Setter
+@ToString
+public class QueryApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String applicationId = "query";
+  private String rankingInputTopic = "top10";
+  private String usersInputTopic = "users";
+  private Integer commitInterval;
+  private Integer cacheMaxBytes;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java
new file mode 100644 (file)
index 0000000..a9b5b80
--- /dev/null
@@ -0,0 +1,43 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.net.URI;
+import java.util.Optional;
+
+
+@RestController
+@RequiredArgsConstructor
+public class QueryController
+{
+  private final QueryStreamProcessor processor;
+
+  @GetMapping("{username}")
+  ResponseEntity<UserRanking> queryFor(@PathVariable String username)
+  {
+    Optional<URI> redirect = processor.getRedirect(username);
+    if (redirect.isPresent())
+    {
+      return
+          ResponseEntity
+              .status(HttpStatus.TEMPORARY_REDIRECT)
+              .location(redirect.get())
+              .build();
+    }
+
+    try
+    {
+      return ResponseEntity.of(processor.getUserRanking(username));
+    }
+    catch (InvalidStateStoreException e)
+    {
+      return ResponseEntity.status(HttpStatus.SERVICE_UNAVAILABLE).build();
+    }
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java
new file mode 100644 (file)
index 0000000..5543a91
--- /dev/null
@@ -0,0 +1,129 @@
+package de.juplo.kafka.wordcount.query;
+
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.net.URI;
+import java.util.Optional;
+import java.util.Properties;
+
+
+@Slf4j
+public class QueryStreamProcessor
+{
+       public static final String STATS_TYPE = "COUNTER";
+       public static final String USER_STORE_NAME = "users";
+       public static final String RANKING_STORE_NAME = "rankings";
+
+       public final KafkaStreams streams;
+       public final HostInfo hostInfo;
+       public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
+
+
+       public QueryStreamProcessor(
+                       Properties props,
+                       HostInfo applicationServer,
+                       String usersInputTopic,
+                       String rankingInputTopic,
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
+       {
+               Topology topology = buildTopology(
+                               usersInputTopic,
+                               rankingInputTopic,
+                               userStoreSupplier,
+                               rankingStoreSupplier);
+               streams = new KafkaStreams(topology, props);
+               hostInfo = applicationServer;
+               storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());;
+       }
+
+       static Topology buildTopology(
+                       String usersInputTopic,
+                       String rankingInputTopic,
+                       KeyValueBytesStoreSupplier userStoreSupplier,
+                       KeyValueBytesStoreSupplier rankingStoreSupplier)
+       {
+               StreamsBuilder builder = new StreamsBuilder();
+
+               KTable<String, User> users = builder
+                               .stream(
+                                               usersInputTopic,
+                                               Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
+                               .toTable(
+                                               Materialized
+                                                               .<String, User>as(userStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
+                                                               .withValueSerde(new JsonSerde().copyWithType(User.class)));
+               KStream<String, Ranking> rankings = builder
+                               .<Key, Ranking>stream(rankingInputTopic)
+                               .filter((key, value) -> STATS_TYPE.equals(key.getType()))
+                               .map((key, value) -> new KeyValue<>(key.getChannel(), value));
+
+               rankings
+                               .join(users, (ranking, user) -> UserRanking.of(
+                                               user.getFirstName(),
+                                               user.getLastName(),
+                                               ranking.getEntries()),
+                                               Joined.keySerde(Serdes.String()))
+                               .toTable(
+                                               Materialized
+                                                               .<String, UserRanking>as(rankingStoreSupplier)
+                                                               .withKeySerde(Serdes.String())
+                                                               .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
+
+               return topology;
+       }
+
+       ReadOnlyKeyValueStore<String, UserRanking> getStore()
+       {
+               return streams.store(storeParameters);
+       }
+
+       public Optional<URI> getRedirect(String username)
+       {
+               KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer());
+               HostInfo activeHost = metadata.activeHost();
+               log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port());
+
+               if (activeHost.equals(this.hostInfo) || activeHost.equals(HostInfo.unavailable()))
+               {
+                       return Optional.empty();
+               }
+
+               URI location = URI.create("http://" + activeHost.host() + ":" + activeHost.port() + "/" + username);
+               log.debug("Redirecting to {}", location);
+               return Optional.of(location);
+       }
+
+       public Optional<UserRanking> getUserRanking(String username)
+       {
+               return Optional.ofNullable(getStore().get(username));
+       }
+
+       @PostConstruct
+       public void start()
+       {
+               log.info("Starting Stream-Processor");
+               streams.start();
+       }
+
+       @PreDestroy
+       public void stop()
+       {
+               log.info("Stopping Stream-Processor");
+               streams.close();
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/User.java b/src/main/java/de/juplo/kafka/wordcount/stats/User.java
new file mode 100644 (file)
index 0000000..f62b475
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode(of = "username")
+public class User
+{
+  public enum Sex { FEMALE, MALE, OTHER }
+
+  private String username;
+
+  private String firstName;
+  private String lastName;
+  private Sex sex;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java
new file mode 100644 (file)
index 0000000..9ca765a
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.*;
+
+
+@Data
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+public class UserRanking
+{
+  private String firstName;
+  private String lastName;
+  private Entry[] top10;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
deleted file mode 100644 (file)
index fb12aee..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.http.MediaType;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
-
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
-import static org.awaitility.Awaitility.await;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-
-@SpringBootTest(
-               properties = {
-                               "spring.main.allow-bean-definition-overriding=true",
-                               "logging.level.root=WARN",
-                               "logging.level.de.juplo=DEBUG",
-                               "logging.level.org.apache.kafka.clients=INFO",
-                               "logging.level.org.apache.kafka.streams=INFO",
-                               "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.query.commit-interval=100",
-                               "juplo.wordcount.query.cache-max-bytes=0",
-                               "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
-                               "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
-@AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
-@Slf4j
-public class QueryApplicationIT
-{
-       public static final String TOPIC_TOP10 = "top10";
-       public static final String TOPIC_USERS = "users";
-
-
-       @Autowired
-       MockMvc mockMvc;
-       @Autowired
-       ObjectMapper objectMapper;
-       @Autowired
-       QueryStreamProcessor streamProcessor;
-
-
-       @BeforeAll
-       public static void testSendMessage(
-                       @Autowired KafkaTemplate usersKafkaTemplate,
-                       @Autowired KafkaTemplate top10KafkaTemplate)
-       {
-               TestData
-                               .getUsersMessages()
-                               .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
-               TestData
-                               .getTop10Messages()
-                               .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
-       }
-
-       private static void flush(CompletableFuture<SendResult> future)
-       {
-               try
-               {
-                       SendResult result = future.get();
-                       log.info(
-                                       "Sent: {}={}, partition={}, offset={}",
-                                       result.getProducerRecord().key(),
-                                       result.getProducerRecord().value(),
-                                       result.getRecordMetadata().partition(),
-                                       result.getRecordMetadata().offset());
-               }
-               catch (Exception e)
-               {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @DisplayName("Await, that the expected state is visible in the state-store")
-       @Test
-       public void testAwaitExpectedStateInStore()
-       {
-               await("The expected state is visible in the state-store")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
-       }
-
-       @DisplayName("Await, that the expected state is queryable")
-       @Test
-       public void testAwaitExpectedStateIsQueryable()
-       {
-               await("The expected state is queryable")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
-       }
-
-       private UserRanking requestUserRankingFor(String user)
-       {
-               try
-               {
-                       return objectMapper.readValue(
-                                       mockMvc
-                                                       .perform(MockMvcRequestBuilders.get("/{user}", user)
-                                                                       .contentType(MediaType.APPLICATION_JSON))
-                                                       .andExpect(status().isOk())
-                                                       .andReturn()
-                                                       .getResponse()
-                                                       .getContentAsString(StandardCharsets.UTF_8),
-                                       UserRanking.class);
-               }
-               catch (Exception e)
-               {
-                       throw new RuntimeException(e);
-               }
-       }
-
-       @TestConfiguration
-       static class Configuration
-       {
-               @Bean
-               KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
-               {
-                       Map<String, Object> properties = Map.of(
-                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
-                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
-                       return new KafkaTemplate(producerFactory, properties);
-               }
-
-               @Bean
-               KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
-               {
-                       Map<String, Object> properties = Map.of(
-                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
-                                       JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
-                       return new KafkaTemplate(producerFactory, properties);
-               }
-
-               @Bean
-               KeyValueBytesStoreSupplier userStoreSupplier()
-               {
-                       return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
-               }
-
-               @Bean
-               KeyValueBytesStoreSupplier rankingStoreSupplier()
-               {
-                       return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
deleted file mode 100644 (file)
index fbeb19b..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import de.juplo.kafka.wordcount.users.TestUserData;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
-
-
-@Slf4j
-public class QueryStreamProcessorTopologyTest
-{
-  public static final String TOP10_IN = "TOP10-IN";
-  public static final String USERS_IN = "USERS-IN";
-  public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
-  public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
-
-
-  TopologyTestDriver testDriver;
-  TestInputTopic<TestUser, TestRanking> top10In;
-  TestInputTopic<String, TestUserData> userIn;
-
-
-  @BeforeEach
-  public void setUp()
-  {
-    Topology topology = QueryStreamProcessor.buildTopology(
-        USERS_IN,
-        TOP10_IN,
-        Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
-        Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
-
-    testDriver = new TopologyTestDriver(topology, serializationConfig());
-
-    top10In = testDriver.createInputTopic(
-        TOP10_IN,
-        jsonSerializer(TestUser.class, true),
-        jsonSerializer(TestRanking.class,false));
-
-    userIn = testDriver.createInputTopic(
-        USERS_IN,
-        new StringSerializer(),
-        jsonSerializer(TestUserData.class, false).noTypeInfo());
-  }
-
-
-  @Test
-  public void test()
-  {
-    TestData
-        .getUsersMessages()
-        .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
-    TestData
-        .getTop10Messages()
-        .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
-
-    KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
-    TestData.assertExpectedState(user -> store.get(user));
-  }
-
-  @AfterEach
-  public void tearDown()
-  {
-    testDriver.close();
-  }
-
-  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
-  {
-    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
-    jsonSerializer.configure(
-        Map.of(
-            JsonSerializer.TYPE_MAPPINGS,
-            "stats:" + TestUser.class.getName() + "," +
-            "ranking:" + TestRanking.class.getName()),
-        isKey);
-    return jsonSerializer;
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
deleted file mode 100644 (file)
index 44162a0..0000000
+++ /dev/null
@@ -1,146 +0,0 @@
-package de.juplo.kafka.wordcount.query;
-
-import de.juplo.kafka.wordcount.top10.TestEntry;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import de.juplo.kafka.wordcount.users.TestUserData;
-import org.apache.kafka.streams.KeyValue;
-
-import java.util.Arrays;
-import java.util.function.Function;
-import java.util.stream.Stream;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
-       static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
-       static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
-
-       static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
-       {
-               return Stream.of(TOP10_MESSAGES);
-       }
-
-       static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
-       {
-               return Stream.of(USERS_MESSAGES);
-       }
-
-       static void assertExpectedState(Function<String, UserRanking> function)
-       {
-               assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
-               assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
-       }
-
-       private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
-       {
-               assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
-       }
-
-       private static UserRanking getLastMessageFor(String user)
-       {
-               return getTop10Messages()
-                               .filter(kv -> kv.key.getChannel().equals(user))
-                               .map(kv -> kv.value)
-                               .map(testRanking -> userRankingFor(user, testRanking))
-                               .reduce(null, (left, right) -> right);
-       }
-
-       private static UserRanking userRankingFor(String user, TestRanking testRanking)
-       {
-               TestUserData testUserData = getUsersMessages()
-                               .filter(kv -> kv.key.equals(user))
-                               .map(kv -> kv.value)
-                               .reduce(null, (left, right) -> right);
-
-               Entry[] entries = Arrays
-                               .stream(testRanking.getEntries())
-                               .map(testEntry -> entryOf(testEntry))
-                               .toArray(size -> new Entry[size]);
-
-               return UserRanking.of(
-                               testUserData.getFirstName(),
-                               testUserData.getLastName(),
-                               entries);
-       }
-
-       private static Entry entryOf(TestEntry testEntry)
-       {
-               Entry entry = new Entry();
-               entry.setKey(testEntry.getKey());
-               entry.setCounter(testEntry.getCounter());
-               return entry;
-       }
-       private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
-       {
-                       KeyValue.pair( // 0
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l))),
-                       KeyValue.pair( // 1
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 1l))),
-                       KeyValue.pair( // 2
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Welt", 1l))),
-                       KeyValue.pair( // 3
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l))),
-                       KeyValue.pair( // 4
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l),
-                                                       TestEntry.of("s", 1l))),
-                       KeyValue.pair( // 5
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Welt", 1l),
-                                                       TestEntry.of("Boäh", 1l))),
-                       KeyValue.pair( // 6
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Boäh", 1l))),
-                       KeyValue.pair( // 7
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Boäh", 2l),
-                                                       TestEntry.of("Hallo", 1l))),
-                       KeyValue.pair( // 8
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l),
-                                                       TestEntry.of("s", 2l))),
-                       KeyValue.pair( // 9
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Boäh", 3l),
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Hallo", 1l))),
-                       KeyValue.pair( // 10
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("s", 3l),
-                                                       TestEntry.of("Müsch", 2l))),
-       };
-
-       private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
-       {
-                       KeyValue.pair(
-                                       PETER.getChannel(),
-                                       TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
-                       KeyValue.pair(
-                                       KLAUS.getChannel(),
-                                       TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
-       };
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java
new file mode 100644 (file)
index 0000000..fb12aee
--- /dev/null
@@ -0,0 +1,172 @@
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.main.allow-bean-definition-overriding=true",
+                               "logging.level.root=WARN",
+                               "logging.level.de.juplo=DEBUG",
+                               "logging.level.org.apache.kafka.clients=INFO",
+                               "logging.level.org.apache.kafka.streams=INFO",
+                               "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.query.commit-interval=100",
+                               "juplo.wordcount.query.cache-max-bytes=0",
+                               "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
+                               "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+@AutoConfigureMockMvc
+@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@Slf4j
+public class QueryApplicationIT
+{
+       public static final String TOPIC_TOP10 = "top10";
+       public static final String TOPIC_USERS = "users";
+
+
+       @Autowired
+       MockMvc mockMvc;
+       @Autowired
+       ObjectMapper objectMapper;
+       @Autowired
+       QueryStreamProcessor streamProcessor;
+
+
+       @BeforeAll
+       public static void testSendMessage(
+                       @Autowired KafkaTemplate usersKafkaTemplate,
+                       @Autowired KafkaTemplate top10KafkaTemplate)
+       {
+               TestData
+                               .getUsersMessages()
+                               .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+               TestData
+                               .getTop10Messages()
+                               .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+       }
+
+       private static void flush(CompletableFuture<SendResult> future)
+       {
+               try
+               {
+                       SendResult result = future.get();
+                       log.info(
+                                       "Sent: {}={}, partition={}, offset={}",
+                                       result.getProducerRecord().key(),
+                                       result.getProducerRecord().value(),
+                                       result.getRecordMetadata().partition(),
+                                       result.getRecordMetadata().offset());
+               }
+               catch (Exception e)
+               {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @DisplayName("Await, that the expected state is visible in the state-store")
+       @Test
+       public void testAwaitExpectedStateInStore()
+       {
+               await("The expected state is visible in the state-store")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
+       }
+
+       @DisplayName("Await, that the expected state is queryable")
+       @Test
+       public void testAwaitExpectedStateIsQueryable()
+       {
+               await("The expected state is queryable")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
+       }
+
+       private UserRanking requestUserRankingFor(String user)
+       {
+               try
+               {
+                       return objectMapper.readValue(
+                                       mockMvc
+                                                       .perform(MockMvcRequestBuilders.get("/{user}", user)
+                                                                       .contentType(MediaType.APPLICATION_JSON))
+                                                       .andExpect(status().isOk())
+                                                       .andReturn()
+                                                       .getResponse()
+                                                       .getContentAsString(StandardCharsets.UTF_8),
+                                       UserRanking.class);
+               }
+               catch (Exception e)
+               {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
+               {
+                       Map<String, Object> properties = Map.of(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+                                       JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
+                       return new KafkaTemplate(producerFactory, properties);
+               }
+
+               @Bean
+               KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
+               {
+                       Map<String, Object> properties = Map.of(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+                                       JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
+                       return new KafkaTemplate(producerFactory, properties);
+               }
+
+               @Bean
+               KeyValueBytesStoreSupplier userStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
+               }
+
+               @Bean
+               KeyValueBytesStoreSupplier rankingStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
+               }
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..fbeb19b
--- /dev/null
@@ -0,0 +1,91 @@
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Map;
+
+import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class QueryStreamProcessorTopologyTest
+{
+  public static final String TOP10_IN = "TOP10-IN";
+  public static final String USERS_IN = "USERS-IN";
+  public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS";
+  public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<TestUser, TestRanking> top10In;
+  TestInputTopic<String, TestUserData> userIn;
+
+
+  @BeforeEach
+  public void setUp()
+  {
+    Topology topology = QueryStreamProcessor.buildTopology(
+        USERS_IN,
+        TOP10_IN,
+        Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
+        Stores.inMemoryKeyValueStore(RANKING_STORE_NAME));
+
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+    top10In = testDriver.createInputTopic(
+        TOP10_IN,
+        jsonSerializer(TestUser.class, true),
+        jsonSerializer(TestRanking.class,false));
+
+    userIn = testDriver.createInputTopic(
+        USERS_IN,
+        new StringSerializer(),
+        jsonSerializer(TestUserData.class, false).noTypeInfo());
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getUsersMessages()
+        .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
+    TestData
+        .getTop10Messages()
+        .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+
+    KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(RANKING_STORE_NAME);
+    TestData.assertExpectedState(user -> store.get(user));
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+  {
+    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+    jsonSerializer.configure(
+        Map.of(
+            JsonSerializer.TYPE_MAPPINGS,
+            "stats:" + TestUser.class.getName() + "," +
+            "ranking:" + TestRanking.class.getName()),
+        isKey);
+    return jsonSerializer;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java
new file mode 100644 (file)
index 0000000..44162a0
--- /dev/null
@@ -0,0 +1,146 @@
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.TestEntry;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import org.apache.kafka.streams.KeyValue;
+
+import java.util.Arrays;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+       static final TestUser PETER = TestUser.of(STATS_TYPE, "peter");
+       static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus");
+
+       static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
+       {
+               return Stream.of(TOP10_MESSAGES);
+       }
+
+       static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
+       {
+               return Stream.of(USERS_MESSAGES);
+       }
+
+       static void assertExpectedState(Function<String, UserRanking> function)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel()));
+               assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel()));
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
+       {
+               assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
+       }
+
+       private static UserRanking getLastMessageFor(String user)
+       {
+               return getTop10Messages()
+                               .filter(kv -> kv.key.getChannel().equals(user))
+                               .map(kv -> kv.value)
+                               .map(testRanking -> userRankingFor(user, testRanking))
+                               .reduce(null, (left, right) -> right);
+       }
+
+       private static UserRanking userRankingFor(String user, TestRanking testRanking)
+       {
+               TestUserData testUserData = getUsersMessages()
+                               .filter(kv -> kv.key.equals(user))
+                               .map(kv -> kv.value)
+                               .reduce(null, (left, right) -> right);
+
+               Entry[] entries = Arrays
+                               .stream(testRanking.getEntries())
+                               .map(testEntry -> entryOf(testEntry))
+                               .toArray(size -> new Entry[size]);
+
+               return UserRanking.of(
+                               testUserData.getFirstName(),
+                               testUserData.getLastName(),
+                               entries);
+       }
+
+       private static Entry entryOf(TestEntry testEntry)
+       {
+               Entry entry = new Entry();
+               entry.setKey(testEntry.getKey());
+               entry.setCounter(testEntry.getCounter());
+               return entry;
+       }
+       private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair( // 0
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 1
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 1l))),
+                       KeyValue.pair( // 2
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l))),
+                       KeyValue.pair( // 3
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l))),
+                       KeyValue.pair( // 4
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 1l))),
+                       KeyValue.pair( // 5
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 6
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 7
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Boäh", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 8
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 2l))),
+                       KeyValue.pair( // 9
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Boäh", 3l),
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 10
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("s", 3l),
+                                                       TestEntry.of("Müsch", 2l))),
+       };
+
+       private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair(
+                                       PETER.getChannel(),
+                                       TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)),
+                       KeyValue.pair(
+                                       KLAUS.getChannel(),
+                                       TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+       };
+}