]> juplo.de Git - demos/kafka/wordcount/commitdiff
stats: 1.0.0 - Renamed the project into `stats` -- ALIGN
authorKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 04:02:34 +0000 (06:02 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 16:34:06 +0000 (18:34 +0200)
14 files changed:
pom.xml
src/main/java/de/juplo/kafka/wordcount/stats/Entry.java
src/main/java/de/juplo/kafka/wordcount/stats/Key.java
src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsController.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/stats/User.java
src/main/java/de/juplo/kafka/wordcount/stats/UserRanking.java
src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/stats/TestData.java

diff --git a/pom.xml b/pom.xml
index 035e095e1f7c77d50ee63d65760b225dc587d1cc..eef1c955a865ecf29b021f68da6cd8c44769ac0f 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
-       <artifactId>query</artifactId>
-       <version>2.1.2</version>
-       <name>Wordcount-Query</name>
-       <description>Query stream-processor of the multi-user wordcount-example</description>
+       <artifactId>stats</artifactId>
+       <version>1.0.0</version>
+       <name>Wordcount-Statistics</name>
+       <description>Statistics stream-processor of the multi-user wordcount-example</description>
        <properties>
                <java.version>21</java.version>
                <docker-maven-plugin.version>0.44.0</docker-maven-plugin.version>
index 383b1a6d73ef1b0294083546d4bad0aefeba11e8..a1aebf2b882ad014beeb45bf240df38ed820229c 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.Data;
 
index a2d85a104cd6d1f8449aa8ce322fa5530e71b201..ab88ee66249d859b3ee8c4efc60274b6a9e07bdf 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.Data;
 
index 8966be6ca8f2a7fe2063085b8c72de86a979f6ba..f9382d96e49af53d2c9e87c7328524c3c787c7cc 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.Data;
 
index eeee7eba22b5e0a14aefe21af931570aa659105b..54ebae4c626f2cc85b2e5eb06a876e0b47477844 100644 (file)
@@ -1,14 +1,14 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 
 @SpringBootApplication
-public class QueryApplication
+public class StatsApplication
 {
        public static void main(String[] args)
        {
-               SpringApplication.run(QueryApplication.class, args);
+               SpringApplication.run(StatsApplication.class, args);
        }
 }
index 0f9cad1ef6327feab9e84e889b7aacaf372214ae..9f0699a3644dd40564b06780c6127281f5985aa1 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -21,20 +21,20 @@ import java.net.Socket;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.USER_STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Configuration
-@EnableConfigurationProperties(QueryApplicationProperties.class)
+@EnableConfigurationProperties(StatsApplicationProperties.class)
 @Slf4j
-public class QueryApplicationConfiguration
+public class StatsApplicationConfiguration
 {
        @Bean
        public HostInfo applicationServer(
                        ServerProperties serverProperties,
-                       QueryApplicationProperties applicationProperties) throws IOException
+                       StatsApplicationProperties applicationProperties) throws IOException
        {
                String host;
                if (serverProperties.getAddress() == null)
@@ -56,7 +56,7 @@ public class QueryApplicationConfiguration
 
        @Bean
        public Properties streamProcessorProperties(
-                       QueryApplicationProperties applicationProperties,
+                       StatsApplicationProperties applicationProperties,
                        HostInfo applicationServer)
        {
                Properties props = new Properties();
@@ -97,15 +97,15 @@ public class QueryApplicationConfiguration
        }
 
        @Bean(initMethod = "start", destroyMethod = "stop")
-       public QueryStreamProcessor streamProcessor(
+       public StatsStreamProcessor streamProcessor(
                        Properties streamProcessorProperties,
                        HostInfo applicationServer,
-                       QueryApplicationProperties applicationProperties,
+                       StatsApplicationProperties applicationProperties,
                        KeyValueBytesStoreSupplier userStoreSupplier,
                        KeyValueBytesStoreSupplier rankingStoreSupplier,
                        ConfigurableApplicationContext context)
        {
-               QueryStreamProcessor streamProcessor = new QueryStreamProcessor(
+               StatsStreamProcessor streamProcessor = new StatsStreamProcessor(
                                streamProcessorProperties,
                                applicationServer,
                                applicationProperties.getUsersInputTopic(),
index 4a9eecaec3e578b58abe32372290e656d3b14b4d..73eddc7413e79758f96695cfb3b0feb9323a57e9 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 
 import lombok.Getter;
@@ -7,14 +7,14 @@ import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 
-@ConfigurationProperties("juplo.wordcount.query")
+@ConfigurationProperties("juplo.wordcount.stats")
 @Getter
 @Setter
 @ToString
-public class QueryApplicationProperties
+public class StatsApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
-  private String applicationId = "query";
+  private String applicationId = "stats";
   private String rankingInputTopic = "top10";
   private String usersInputTopic = "users";
   private Integer commitInterval;
index a9b5b80a6be7c1b55cdfa576d1d03d7cfe6e8343..698e7b407d75ea406532ddd1ae14366c2e801da0 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.RequiredArgsConstructor;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -14,9 +14,9 @@ import java.util.Optional;
 
 @RestController
 @RequiredArgsConstructor
-public class QueryController
+public class StatsController
 {
-  private final QueryStreamProcessor processor;
+  private final StatsStreamProcessor processor;
 
   @GetMapping("{username}")
   ResponseEntity<UserRanking> queryFor(@PathVariable String username)
index 5543a9103e1b6f102fbbf2b2c813a21c48cabe56..c4557ea492d65584135580f8ab734d1cac37649e 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import jakarta.annotation.PostConstruct;
 import jakarta.annotation.PreDestroy;
@@ -18,9 +18,9 @@ import java.util.Properties;
 
 
 @Slf4j
-public class QueryStreamProcessor
+public class StatsStreamProcessor
 {
-       public static final String STATS_TYPE = "COUNTER";
+       public static final String STATS_TYPE = "POPULAR";
        public static final String USER_STORE_NAME = "users";
        public static final String RANKING_STORE_NAME = "rankings";
 
@@ -29,7 +29,7 @@ public class QueryStreamProcessor
        public final StoreQueryParameters<ReadOnlyKeyValueStore<String, UserRanking>> storeParameters;
 
 
-       public QueryStreamProcessor(
+       public StatsStreamProcessor(
                        Properties props,
                        HostInfo applicationServer,
                        String usersInputTopic,
index f62b475dba6e626dff64638998fe1161649dd8bd..a4a418051e6ab65895a0fed016d7c80931f0e8a7 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.Data;
 import lombok.EqualsAndHashCode;
index 9ca765ae09c5a530f42d92d46aca5e16e0af9c83..48babb71b7266340ce4c521d0733fe524755c524 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.*;
 
index fb12aee409935bfb5c7b16f58e4ce95b36a1e51b..b96874322d09a1ebf0cd5dba2da5c4215f609018 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.wordcount.top10.TestRanking;
@@ -30,8 +30,8 @@ import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.USER_STORE_NAME;
 import static org.awaitility.Awaitility.await;
 import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
@@ -43,15 +43,15 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
                                "logging.level.org.apache.kafka.streams=INFO",
-                               "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.query.commit-interval=100",
-                               "juplo.wordcount.query.cache-max-bytes=0",
-                               "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
-                               "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+                               "juplo.wordcount.stats.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.stats.commit-interval=100",
+                               "juplo.wordcount.stats.cache-max-bytes=0",
+                               "juplo.wordcount.stats.users-input-topic=" + StatsApplicationIT.TOPIC_USERS,
+                               "juplo.wordcount.stats.ranking-input-topic=" + StatsApplicationIT.TOPIC_TOP10 })
 @AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@EmbeddedKafka(topics = { StatsApplicationIT.TOPIC_TOP10, StatsApplicationIT.TOPIC_USERS})
 @Slf4j
-public class QueryApplicationIT
+public class StatsApplicationIT
 {
        public static final String TOPIC_TOP10 = "top10";
        public static final String TOPIC_USERS = "users";
@@ -62,7 +62,7 @@ public class QueryApplicationIT
        @Autowired
        ObjectMapper objectMapper;
        @Autowired
-       QueryStreamProcessor streamProcessor;
+       StatsStreamProcessor streamProcessor;
 
 
        @BeforeAll
index fbeb19b5bd36de954bb9f46fcae2e005b025215c..54b42f049a744a0eb287809e0c65c9b9eb4b3044 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import de.juplo.kafka.wordcount.top10.TestRanking;
 import de.juplo.kafka.wordcount.top10.TestUser;
@@ -17,11 +17,11 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 
 import java.util.Map;
 
-import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.stats.StatsApplicationConfiguration.serializationConfig;
 
 
 @Slf4j
-public class QueryStreamProcessorTopologyTest
+public class StatsStreamProcessorTopologyTest
 {
   public static final String TOP10_IN = "TOP10-IN";
   public static final String USERS_IN = "USERS-IN";
@@ -37,7 +37,7 @@ public class QueryStreamProcessorTopologyTest
   @BeforeEach
   public void setUp()
   {
-    Topology topology = QueryStreamProcessor.buildTopology(
+    Topology topology = StatsStreamProcessor.buildTopology(
         USERS_IN,
         TOP10_IN,
         Stores.inMemoryKeyValueStore(USERS_STORE_NAME),
index 44162a0d9ab760ca2e9c9a607661f91e99511842..160361943ba2e891045fabdab8ccfa1aa32cbfab 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.stats;
 
 import de.juplo.kafka.wordcount.top10.TestEntry;
 import de.juplo.kafka.wordcount.top10.TestRanking;
@@ -10,7 +10,7 @@ import java.util.Arrays;
 import java.util.function.Function;
 import java.util.stream.Stream;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STATS_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;