]> juplo.de Git - demos/kafka/wordcount/commitdiff
stats: 1.0.0 - Renamed packages and classes -- ALIGN
authorKai Moritz <kai@juplo.de>
Thu, 20 Jun 2024 20:19:17 +0000 (22:19 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 06:54:37 +0000 (08:54 +0200)
18 files changed:
pom.xml
src/main/java/de/juplo/kafka/wordcount/stats/Entry.java
src/main/java/de/juplo/kafka/wordcount/stats/Key.java
src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java
src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java
src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java
src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java
src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java
src/test/java/de/juplo/kafka/wordcount/out/TestUser.java
src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java
src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/stats/TestData.java

diff --git a/pom.xml b/pom.xml
index b30c4ea7dd8b22e0d2a43d0a56c6fe8f58c37636..f9d0df38ebb38082eeeee7d11cc3c5cb0bc2eee2 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
-       <artifactId>top10</artifactId>
-       <version>1.2.1</version>
-       <name>Wordcount-Top-10</name>
-       <description>Top-10 stream-processor of the multi-user wordcount-example</description>
+       <artifactId>stats</artifactId>
+       <version>1.0.0</version>
+       <name>Wordcount-Statistics</name>
+       <description>Statistics stream-processor of the multi-word wordcount-example</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
        </properties>
index b25fc079213405c926b9a1a9a5d091d6e4513b21..18fea0c8e56583b67c87151de7f40ebbf63f3cae 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.AccessLevel;
@@ -15,6 +15,6 @@ import lombok.NoArgsConstructor;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Entry
 {
-  private String word;
+  private String key;
   private Long counter;
 }
index ffac8ea1fc10f97c0108ba60322e9f0ba539567e..d79ca51a7454a128828c410fa2e89f38b68652fe 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.*;
index 4f56c18888622028f7c88d8b3c9b12c41b692f44..25319d1fe1b17e0d957325115892a15781e879b2 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.*;
 
@@ -49,7 +49,7 @@ public class Ranking
         for (int j = i+1; j < list.size(); j++)
         {
           entry = list.get(j);
-          if(entry.getWord().equals(newEntry.getWord()))
+          if(entry.getKey().equals(newEntry.getKey()))
           {
             list.remove(j);
             break;
@@ -63,7 +63,7 @@ public class Ranking
         return this;
       }
 
-      if (entry.getWord().equals(newEntry.getWord()))
+      if (entry.getKey().equals(newEntry.getKey()))
         oldPosition = i;
     }
 
@@ -93,12 +93,12 @@ public class Ranking
     {
       Entry entry = this.entries[i];
 
-      if (seenWords.contains(entry.getWord()))
-        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+      if (seenWords.contains(entry.getKey()))
+        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey());
       if (entry.getCounter() > lowesCounting)
         throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
 
-      seenWords.add(entry.getWord());
+      seenWords.add(entry.getKey());
       lowesCounting = entry.getCounter();
     }
 
@@ -128,13 +128,13 @@ public class Ranking
     Set<String> otherWordsWithCurrentCount = new HashSet<>();
     Entry myEntry = entries[i];
     long currentCount = myEntry.getCounter();
-    myWordsWithCurrentCount.add(myEntry.getWord());
+    myWordsWithCurrentCount.add(myEntry.getKey());
     while (true)
     {
       Entry otherEntry = other.entries[i];
       if (otherEntry.getCounter() != currentCount)
         return false;
-      otherWordsWithCurrentCount.add(otherEntry.getWord());
+      otherWordsWithCurrentCount.add(otherEntry.getKey());
       if (++i >= entries.length)
         return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
       myEntry = entries[i];
@@ -146,7 +146,7 @@ public class Ranking
         myWordsWithCurrentCount.clear();
         otherWordsWithCurrentCount.clear();
       }
-      myWordsWithCurrentCount.add(myEntry.getWord());
+      myWordsWithCurrentCount.add(myEntry.getKey());
     }
   }
 
index 5c14ae7c69d9dd51018f172f6a65d92f867f6272..54ebae4c626f2cc85b2e5eb06a876e0b47477844 100644 (file)
@@ -1,14 +1,14 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 
 @SpringBootApplication
-public class Top10Application
+public class StatsApplication
 {
        public static void main(String[] args)
        {
-               SpringApplication.run(Top10Application.class, args);
+               SpringApplication.run(StatsApplication.class, args);
        }
 }
index 255f0e4b559d14a16f60d8b4e269fd64d7fa0594..53ea1c559732516acdab9f621846db0d32b852a8 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -16,17 +16,17 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Configuration
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@EnableConfigurationProperties(StatsApplicationProperties.class)
 @Slf4j
-public class Top10ApplicationConfiguration
+public class StatsApplicationConfiguration
 {
        @Bean
-       public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+       public Properties streamProcessorProperties(StatsApplicationProperties properties)
        {
                Properties props = new Properties();
 
@@ -51,26 +51,26 @@ public class Top10ApplicationConfiguration
 
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, WindowedKey.class.getName());
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
                                "word:" + Key.class.getName() + "," +
                                "counter:" + Entry.class.getName() + "," +
-                               "user:" + User.class.getName() + "," +
+                               "user:" + WindowedKey.class.getName() + "," +
                                "ranking:" + Ranking.class.getName());
 
                return props;
        }
 
        @Bean(initMethod = "start", destroyMethod = "stop")
-       public Top10StreamProcessor streamProcessor(
-                       Top10ApplicationProperties applicationProperties,
+       public StatsStreamProcessor streamProcessor(
+                       StatsApplicationProperties applicationProperties,
                        Properties streamProcessorProperties,
                        KeyValueBytesStoreSupplier storeSupplier,
                        ConfigurableApplicationContext context)
        {
-               Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+               StatsStreamProcessor streamProcessor = new StatsStreamProcessor(
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
                                streamProcessorProperties,
index d3bb23639e33340632b5bae100372746d794c2ff..841b290ba34e0bac54e0bd7b8822e304ad5c025d 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 
 import lombok.Getter;
@@ -7,16 +7,16 @@ import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 
-@ConfigurationProperties("juplo.wordcount.top10")
+@ConfigurationProperties("juplo.wordcount.stats")
 @Getter
 @Setter
 @ToString
-public class Top10ApplicationProperties
+public class StatsApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
-  private String applicationId = "top10";
-  private String inputTopic = "countings";
-  private String outputTopic = "top10";
+  private String applicationId = "stats";
+  private String inputTopic = "stats";
+  private String outputTopic = "results";
   private Integer commitInterval;
   private Integer cacheMaxBytes;
 }
index 70ead8796c49617037fa39a176aa44390417a272..15c08ee28200435276e0a1c1e46c295d867f8a7f 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.*;
@@ -11,20 +11,20 @@ import java.util.Properties;
 
 
 @Slf4j
-public class Top10StreamProcessor
+public class StatsStreamProcessor
 {
-       public static final String STORE_NAME= "top10";
+       public static final String STORE_NAME= "stats";
 
        public final KafkaStreams streams;
 
 
-       public Top10StreamProcessor(
+       public StatsStreamProcessor(
                        String inputTopic,
                        String outputTopic,
                        Properties props,
                        KeyValueBytesStoreSupplier storeSupplier)
        {
-               Topology topology = Top10StreamProcessor.buildTopology(
+               Topology topology = StatsStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
                                storeSupplier);
@@ -41,11 +41,11 @@ public class Top10StreamProcessor
 
                builder
                                .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+                               .map((key, entry) -> new KeyValue<>(WindowedKey.of(key.getUser()), entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
-                                               (user, entry, ranking) -> ranking.add(entry),
+                                               (windowedKey, entry, ranking) -> ranking.add(entry),
                                                Materialized.as(storeSupplier))
                                .toStream()
                                .to(outputTopic);
@@ -56,7 +56,7 @@ public class Top10StreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<User, Ranking> getStore()
+       ReadOnlyKeyValueStore<WindowedKey, Ranking> getStore()
        {
                return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
index 53c258d9322ab937116dc5bfe0ad300b31cd2b76..9b77cacf430ca36e57acc1394ecb5a34296fc1c2 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 @Data
-public class User
+public class WindowedKey
 {
   String user;
 }
index d98ae649ba6beb34321a70dcc344f499f1959afd..ada66d3c62e311f5a3fab66a11c5676cccb21a37 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.in;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,14 +8,14 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
-public class TestCounter
+public class InputCounter
 {
   String user;
-  String word;
+  String key;
   long counter;
 
-  public static TestCounter of(TestWord word, long counter)
+  public static InputCounter of(InputWindowedKey word, long counter)
   {
-    return new TestCounter(word.getUser(), word.getWord(), counter);
+    return new InputCounter(word.getUser(), word.getKey(), counter);
   }
 }
index 8008e1226df7e86a41f825158923c1bdba73b47f..255d206d616bc67929e85631482797ad8fed36f3 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.in;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.AllArgsConstructor;
@@ -10,8 +10,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @Data
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class TestWord
+public class InputWindowedKey
 {
   private String user;
-  private String word;
+  private String key;
 }
index a5152e67476d0df31f6857fccab5d8ce942a84c6..f7f5679ce3050c3120a08ad8067937362004c134 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -10,6 +10,6 @@ import lombok.NoArgsConstructor;
 @Data
 public class TestEntry
 {
-  String word;
+  String key;
   long counter;
 }
index efad48b4700337535367b0f139761726d954f565..62ed323a2b3443c48fe46a2cde5becb69193a4bb 100644 (file)
@@ -1,6 +1,5 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
 
-import de.juplo.kafka.wordcount.top10.Entry;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
index 53a5992091ac9ded81b0137b46944df38c3fffc3..3affeabf5da6b3fc5719433e3ad506cbc8d6d620 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
index 26749e9440783822e03181b9709fdd0517cc4a27..5599403079a5aa8f38dac93e2a0c1958e050348c 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -108,7 +108,7 @@ public class RankingTest
             Stream.of(highestEntry),
             VALID_RANKINGS[0]
                 .stream()
-                .filter(entry -> !entry.getWord().equals(word)))
+                .filter(entry -> !entry.getKey().equals(word)))
         .toList();
     assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
   }
@@ -134,7 +134,7 @@ public class RankingTest
     Ranking ranking = Ranking.of(toArray(entryList));
     entryList.forEach(entry ->
       assertThatExceptionOfType(IllegalArgumentException.class)
-          .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+          .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1))));
   }
 
   @DisplayName("Identical rankings are considered equal")
index f5ef236c19a26095b1a118262fb860314229b28d..08d0eb420069bac209b19d72e264e6d8ebb6e2df 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -28,7 +28,7 @@ import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -36,23 +36,23 @@ import static org.awaitility.Awaitility.await;
                properties = {
                                "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.in.InputWindowedKey,counter:de.juplo.kafka.wordcount.in.InputCounter",
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.out.TestUser,ranking:de.juplo.kafka.wordcount.out.TestRanking",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
                                "logging.level.org.apache.kafka.streams=INFO",
-                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.top10.commit-interval=100",
-                               "juplo.wordcount.top10.cacheMaxBytes=0",
-                               "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
-                               "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+                               "juplo.wordcount.stats.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.stats.commit-interval=100",
+                               "juplo.wordcount.stats.cacheMaxBytes=0",
+                               "juplo.wordcount.stats.input-topic=" + StatsApplicationIT.TOPIC_IN,
+                               "juplo.wordcount.stats.output-topic=" + StatsApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { StatsApplicationIT.TOPIC_IN, StatsApplicationIT.TOPIC_OUT })
 @Slf4j
-public class Top10ApplicationIT
+public class StatsApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
@@ -60,12 +60,12 @@ public class Top10ApplicationIT
        @Autowired
        Consumer consumer;
        @Autowired
-       Top10StreamProcessor streamProcessor;
+       StatsStreamProcessor streamProcessor;
 
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+                       @Autowired KafkaTemplate<InputWindowedKey, InputCounter> kafkaTemplate)
        {
                TestData
                                .getInputMessages()
@@ -73,7 +73,7 @@ public class Top10ApplicationIT
                                {
                                        try
                                        {
-                                               SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<InputWindowedKey, InputCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
index 90d8e4cb90987d0455c84f67252b6080e5d9bed1..dad85df81f9cc7b2cafbac6ecb7a3a5a7969fd4b 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
@@ -21,11 +21,11 @@ import org.springframework.util.MultiValueMap;
 
 import java.util.Map;
 
-import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.stats.StatsApplicationConfiguration.serializationConfig;
 
 
 @Slf4j
-public class Top10StreamProcessorTopologyTest
+public class StatsStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
@@ -33,14 +33,14 @@ public class Top10StreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<TestWord, TestCounter> in;
+  TestInputTopic<InputWindowedKey, InputCounter> in;
   TestOutputTopic<TestUser, TestRanking> out;
 
 
   @BeforeEach
   public void setUp()
   {
-    Topology topology = Top10StreamProcessor.buildTopology(
+    Topology topology = StatsStreamProcessor.buildTopology(
         IN,
         OUT,
         Stores.inMemoryKeyValueStore(STORE_NAME));
@@ -49,8 +49,8 @@ public class Top10StreamProcessorTopologyTest
 
     in = testDriver.createInputTopic(
         IN,
-        jsonSerializer(TestWord.class, true),
-        jsonSerializer(TestCounter.class,false));
+        jsonSerializer(InputWindowedKey.class, true),
+        jsonSerializer(InputCounter.class,false));
 
     out = testDriver.createOutputTopic(
         OUT,
@@ -81,7 +81,7 @@ public class Top10StreamProcessorTopologyTest
     TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
     TestData.assertExpectedLastMessagesForUsers(receivedMessages);
 
-    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<WindowedKey, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
     TestData.assertExpectedState(store);
   }
 
@@ -97,8 +97,8 @@ public class Top10StreamProcessorTopologyTest
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
-            "word:" + TestWord.class.getName() + "," +
-            "counter:" + TestCounter.class.getName()),
+            "word:" + InputWindowedKey.class.getName() + "," +
+            "counter:" + InputCounter.class.getName()),
         isKey);
     return jsonSerializer;
   }
index 7a3a27edddb2722dae8dbf4f53d59c9ebcddd58f..c884b0682e7f71d251cd946b9b0a08a02d163475 100644 (file)
@@ -1,10 +1,10 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestEntry;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestEntry;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
@@ -21,46 +21,46 @@ class TestData
        static final TestUser PETER = TestUser.of("peter");
        static final TestUser KLAUS = TestUser.of("klaus");
 
-       static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+       static final Stream<KeyValue<InputWindowedKey, InputCounter>> getInputMessages()
        {
                return Stream.of(INPUT_MESSAGES);
        }
 
-       private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+       private static final KeyValue<InputWindowedKey, InputCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Hallo"),
-                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
+                                       InputWindowedKey.of(PETER.getUser(),"Hallo"),
+                                       InputCounter.of(PETER.getUser(),"Hallo",1)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
+                                       InputCounter.of(KLAUS.getUser(),"Müsch",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",1)),
+                                       InputWindowedKey.of(PETER.getUser(),"Welt"),
+                                       InputCounter.of(PETER.getUser(),"Welt",1)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
+                                       InputCounter.of(KLAUS.getUser(),"Müsch",2)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",1)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
+                                       InputCounter.of(KLAUS.getUser(),"s",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
+                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
+                                       InputCounter.of(PETER.getUser(),"Boäh",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",2)),
+                                       InputWindowedKey.of(PETER.getUser(),"Welt"),
+                                       InputCounter.of(PETER.getUser(),"Welt",2)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
+                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
+                                       InputCounter.of(PETER.getUser(),"Boäh",2)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",2)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
+                                       InputCounter.of(KLAUS.getUser(),"s",2)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
+                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
+                                       InputCounter.of(PETER.getUser(),"Boäh",3)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",3)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
+                                       InputCounter.of(KLAUS.getUser(),"s",3)),
        };
 
        static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
@@ -71,15 +71,15 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       static void assertExpectedState(ReadOnlyKeyValueStore<WindowedKey, Ranking> store)
        {
                assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
                assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
        }
 
-       private static User userOf(TestUser user)
+       private static WindowedKey userOf(TestUser user)
        {
-               return User.of(user.getUser());
+               return WindowedKey.of(user.getUser());
        }
 
        static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
@@ -113,7 +113,7 @@ class TestData
                return Arrays
                                .stream(entries)
                                .map(entry -> TestEntry.of(
-                                               entry.getWord(),
+                                               entry.getKey(),
                                                entry.getCounter() == null
                                                                ? -1l
                                                                : entry.getCounter()))