stats: 1.0.0 - Renamed packages and classes -- ALIGN
authorKai Moritz <kai@juplo.de>
Thu, 20 Jun 2024 20:19:17 +0000 (22:19 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 22 Jun 2024 06:54:37 +0000 (08:54 +0200)
18 files changed:
pom.xml
src/main/java/de/juplo/kafka/wordcount/stats/Entry.java
src/main/java/de/juplo/kafka/wordcount/stats/Key.java
src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java
src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java
src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java
src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java
src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java
src/test/java/de/juplo/kafka/wordcount/out/TestUser.java
src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java
src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/stats/TestData.java

diff --git a/pom.xml b/pom.xml
index b30c4ea..f9d0df3 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
-       <artifactId>top10</artifactId>
-       <version>1.2.1</version>
-       <name>Wordcount-Top-10</name>
-       <description>Top-10 stream-processor of the multi-user wordcount-example</description>
+       <artifactId>stats</artifactId>
+       <version>1.0.0</version>
+       <name>Wordcount-Statistics</name>
+       <description>Statistics stream-processor of the multi-word wordcount-example</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
        </properties>
index b25fc07..18fea0c 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.AccessLevel;
@@ -15,6 +15,6 @@ import lombok.NoArgsConstructor;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class Entry
 {
-  private String word;
+  private String key;
   private Long counter;
 }
index ffac8ea..d79ca51 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.*;
index 4f56c18..25319d1 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.*;
 
@@ -49,7 +49,7 @@ public class Ranking
         for (int j = i+1; j < list.size(); j++)
         {
           entry = list.get(j);
-          if(entry.getWord().equals(newEntry.getWord()))
+          if(entry.getKey().equals(newEntry.getKey()))
           {
             list.remove(j);
             break;
@@ -63,7 +63,7 @@ public class Ranking
         return this;
       }
 
-      if (entry.getWord().equals(newEntry.getWord()))
+      if (entry.getKey().equals(newEntry.getKey()))
         oldPosition = i;
     }
 
@@ -93,12 +93,12 @@ public class Ranking
     {
       Entry entry = this.entries[i];
 
-      if (seenWords.contains(entry.getWord()))
-        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+      if (seenWords.contains(entry.getKey()))
+        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey());
       if (entry.getCounter() > lowesCounting)
         throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
 
-      seenWords.add(entry.getWord());
+      seenWords.add(entry.getKey());
       lowesCounting = entry.getCounter();
     }
 
@@ -128,13 +128,13 @@ public class Ranking
     Set<String> otherWordsWithCurrentCount = new HashSet<>();
     Entry myEntry = entries[i];
     long currentCount = myEntry.getCounter();
-    myWordsWithCurrentCount.add(myEntry.getWord());
+    myWordsWithCurrentCount.add(myEntry.getKey());
     while (true)
     {
       Entry otherEntry = other.entries[i];
       if (otherEntry.getCounter() != currentCount)
         return false;
-      otherWordsWithCurrentCount.add(otherEntry.getWord());
+      otherWordsWithCurrentCount.add(otherEntry.getKey());
       if (++i >= entries.length)
         return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
       myEntry = entries[i];
@@ -146,7 +146,7 @@ public class Ranking
         myWordsWithCurrentCount.clear();
         otherWordsWithCurrentCount.clear();
       }
-      myWordsWithCurrentCount.add(myEntry.getWord());
+      myWordsWithCurrentCount.add(myEntry.getKey());
     }
   }
 
index 5c14ae7..54ebae4 100644 (file)
@@ -1,14 +1,14 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 
 @SpringBootApplication
-public class Top10Application
+public class StatsApplication
 {
        public static void main(String[] args)
        {
-               SpringApplication.run(Top10Application.class, args);
+               SpringApplication.run(StatsApplication.class, args);
        }
 }
index 255f0e4..53ea1c5 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -16,17 +16,17 @@ import org.springframework.kafka.support.serializer.JsonSerde;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Configuration
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@EnableConfigurationProperties(StatsApplicationProperties.class)
 @Slf4j
-public class Top10ApplicationConfiguration
+public class StatsApplicationConfiguration
 {
        @Bean
-       public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+       public Properties streamProcessorProperties(StatsApplicationProperties properties)
        {
                Properties props = new Properties();
 
@@ -51,26 +51,26 @@ public class Top10ApplicationConfiguration
 
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, WindowedKey.class.getName());
                props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
                                "word:" + Key.class.getName() + "," +
                                "counter:" + Entry.class.getName() + "," +
-                               "user:" + User.class.getName() + "," +
+                               "user:" + WindowedKey.class.getName() + "," +
                                "ranking:" + Ranking.class.getName());
 
                return props;
        }
 
        @Bean(initMethod = "start", destroyMethod = "stop")
-       public Top10StreamProcessor streamProcessor(
-                       Top10ApplicationProperties applicationProperties,
+       public StatsStreamProcessor streamProcessor(
+                       StatsApplicationProperties applicationProperties,
                        Properties streamProcessorProperties,
                        KeyValueBytesStoreSupplier storeSupplier,
                        ConfigurableApplicationContext context)
        {
-               Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+               StatsStreamProcessor streamProcessor = new StatsStreamProcessor(
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
                                streamProcessorProperties,
index d3bb236..841b290 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 
 import lombok.Getter;
@@ -7,16 +7,16 @@ import lombok.ToString;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
 
-@ConfigurationProperties("juplo.wordcount.top10")
+@ConfigurationProperties("juplo.wordcount.stats")
 @Getter
 @Setter
 @ToString
-public class Top10ApplicationProperties
+public class StatsApplicationProperties
 {
   private String bootstrapServer = "localhost:9092";
-  private String applicationId = "top10";
-  private String inputTopic = "countings";
-  private String outputTopic = "top10";
+  private String applicationId = "stats";
+  private String inputTopic = "stats";
+  private String outputTopic = "results";
   private Integer commitInterval;
   private Integer cacheMaxBytes;
 }
index 70ead87..15c08ee 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.*;
@@ -11,20 +11,20 @@ import java.util.Properties;
 
 
 @Slf4j
-public class Top10StreamProcessor
+public class StatsStreamProcessor
 {
-       public static final String STORE_NAME= "top10";
+       public static final String STORE_NAME= "stats";
 
        public final KafkaStreams streams;
 
 
-       public Top10StreamProcessor(
+       public StatsStreamProcessor(
                        String inputTopic,
                        String outputTopic,
                        Properties props,
                        KeyValueBytesStoreSupplier storeSupplier)
        {
-               Topology topology = Top10StreamProcessor.buildTopology(
+               Topology topology = StatsStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
                                storeSupplier);
@@ -41,11 +41,11 @@ public class Top10StreamProcessor
 
                builder
                                .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+                               .map((key, entry) -> new KeyValue<>(WindowedKey.of(key.getUser()), entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
-                                               (user, entry, ranking) -> ranking.add(entry),
+                                               (windowedKey, entry, ranking) -> ranking.add(entry),
                                                Materialized.as(storeSupplier))
                                .toStream()
                                .to(outputTopic);
@@ -56,7 +56,7 @@ public class Top10StreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<User, Ranking> getStore()
+       ReadOnlyKeyValueStore<WindowedKey, Ranking> getStore()
        {
                return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
index 53c258d..9b77cac 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,7 +8,7 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 @NoArgsConstructor
 @Data
-public class User
+public class WindowedKey
 {
   String user;
 }
index d98ae64..ada66d3 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.in;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -8,14 +8,14 @@ import lombok.NoArgsConstructor;
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
-public class TestCounter
+public class InputCounter
 {
   String user;
-  String word;
+  String key;
   long counter;
 
-  public static TestCounter of(TestWord word, long counter)
+  public static InputCounter of(InputWindowedKey word, long counter)
   {
-    return new TestCounter(word.getUser(), word.getWord(), counter);
+    return new InputCounter(word.getUser(), word.getKey(), counter);
   }
 }
index 8008e12..255d206 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.in;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import lombok.AllArgsConstructor;
@@ -10,8 +10,8 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @Data
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class TestWord
+public class InputWindowedKey
 {
   private String user;
-  private String word;
+  private String key;
 }
index a5152e6..f7f5679 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -10,6 +10,6 @@ import lombok.NoArgsConstructor;
 @Data
 public class TestEntry
 {
-  String word;
+  String key;
   long counter;
 }
index efad48b..62ed323 100644 (file)
@@ -1,6 +1,5 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
 
-import de.juplo.kafka.wordcount.top10.Entry;
 import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
index 53a5992..3affeab 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.query;
+package de.juplo.kafka.wordcount.out;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
index 26749e9..5599403 100644 (file)
@@ -1,4 +1,4 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -108,7 +108,7 @@ public class RankingTest
             Stream.of(highestEntry),
             VALID_RANKINGS[0]
                 .stream()
-                .filter(entry -> !entry.getWord().equals(word)))
+                .filter(entry -> !entry.getKey().equals(word)))
         .toList();
     assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
   }
@@ -134,7 +134,7 @@ public class RankingTest
     Ranking ranking = Ranking.of(toArray(entryList));
     entryList.forEach(entry ->
       assertThatExceptionOfType(IllegalArgumentException.class)
-          .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+          .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1))));
   }
 
   @DisplayName("Identical rankings are considered equal")
index f5ef236..08d0eb4 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -28,7 +28,7 @@ import org.springframework.util.MultiValueMap;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -36,23 +36,23 @@ import static org.awaitility.Awaitility.await;
                properties = {
                                "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.in.InputWindowedKey,counter:de.juplo.kafka.wordcount.in.InputCounter",
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.out.TestUser,ranking:de.juplo.kafka.wordcount.out.TestRanking",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
                                "logging.level.org.apache.kafka.streams=INFO",
-                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.top10.commit-interval=100",
-                               "juplo.wordcount.top10.cacheMaxBytes=0",
-                               "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
-                               "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+                               "juplo.wordcount.stats.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.stats.commit-interval=100",
+                               "juplo.wordcount.stats.cacheMaxBytes=0",
+                               "juplo.wordcount.stats.input-topic=" + StatsApplicationIT.TOPIC_IN,
+                               "juplo.wordcount.stats.output-topic=" + StatsApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { StatsApplicationIT.TOPIC_IN, StatsApplicationIT.TOPIC_OUT })
 @Slf4j
-public class Top10ApplicationIT
+public class StatsApplicationIT
 {
        public static final String TOPIC_IN = "in";
        public static final String TOPIC_OUT = "out";
@@ -60,12 +60,12 @@ public class Top10ApplicationIT
        @Autowired
        Consumer consumer;
        @Autowired
-       Top10StreamProcessor streamProcessor;
+       StatsStreamProcessor streamProcessor;
 
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+                       @Autowired KafkaTemplate<InputWindowedKey, InputCounter> kafkaTemplate)
        {
                TestData
                                .getInputMessages()
@@ -73,7 +73,7 @@ public class Top10ApplicationIT
                                {
                                        try
                                        {
-                                               SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<InputWindowedKey, InputCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
index 90d8e4c..dad85df 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
@@ -21,11 +21,11 @@ import org.springframework.util.MultiValueMap;
 
 import java.util.Map;
 
-import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
+import static de.juplo.kafka.wordcount.stats.StatsApplicationConfiguration.serializationConfig;
 
 
 @Slf4j
-public class Top10StreamProcessorTopologyTest
+public class StatsStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
@@ -33,14 +33,14 @@ public class Top10StreamProcessorTopologyTest
 
 
   TopologyTestDriver testDriver;
-  TestInputTopic<TestWord, TestCounter> in;
+  TestInputTopic<InputWindowedKey, InputCounter> in;
   TestOutputTopic<TestUser, TestRanking> out;
 
 
   @BeforeEach
   public void setUp()
   {
-    Topology topology = Top10StreamProcessor.buildTopology(
+    Topology topology = StatsStreamProcessor.buildTopology(
         IN,
         OUT,
         Stores.inMemoryKeyValueStore(STORE_NAME));
@@ -49,8 +49,8 @@ public class Top10StreamProcessorTopologyTest
 
     in = testDriver.createInputTopic(
         IN,
-        jsonSerializer(TestWord.class, true),
-        jsonSerializer(TestCounter.class,false));
+        jsonSerializer(InputWindowedKey.class, true),
+        jsonSerializer(InputCounter.class,false));
 
     out = testDriver.createOutputTopic(
         OUT,
@@ -81,7 +81,7 @@ public class Top10StreamProcessorTopologyTest
     TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
     TestData.assertExpectedLastMessagesForUsers(receivedMessages);
 
-    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<WindowedKey, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
     TestData.assertExpectedState(store);
   }
 
@@ -97,8 +97,8 @@ public class Top10StreamProcessorTopologyTest
     jsonSerializer.configure(
         Map.of(
             JsonSerializer.TYPE_MAPPINGS,
-            "word:" + TestWord.class.getName() + "," +
-            "counter:" + TestCounter.class.getName()),
+            "word:" + InputWindowedKey.class.getName() + "," +
+            "counter:" + InputCounter.class.getName()),
         isKey);
     return jsonSerializer;
   }
index 7a3a27e..c884b06 100644 (file)
@@ -1,10 +1,10 @@
-package de.juplo.kafka.wordcount.top10;
+package de.juplo.kafka.wordcount.stats;
 
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestEntry;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.in.InputCounter;
+import de.juplo.kafka.wordcount.in.InputWindowedKey;
+import de.juplo.kafka.wordcount.out.TestEntry;
+import de.juplo.kafka.wordcount.out.TestRanking;
+import de.juplo.kafka.wordcount.out.TestUser;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
@@ -21,46 +21,46 @@ class TestData
        static final TestUser PETER = TestUser.of("peter");
        static final TestUser KLAUS = TestUser.of("klaus");
 
-       static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+       static final Stream<KeyValue<InputWindowedKey, InputCounter>> getInputMessages()
        {
                return Stream.of(INPUT_MESSAGES);
        }
 
-       private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+       private static final KeyValue<InputWindowedKey, InputCounter>[] INPUT_MESSAGES = new KeyValue[]
        {
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Hallo"),
-                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
+                                       InputWindowedKey.of(PETER.getUser(),"Hallo"),
+                                       InputCounter.of(PETER.getUser(),"Hallo",1)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
+                                       InputCounter.of(KLAUS.getUser(),"Müsch",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",1)),
+                                       InputWindowedKey.of(PETER.getUser(),"Welt"),
+                                       InputCounter.of(PETER.getUser(),"Welt",1)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"Müsch"),
+                                       InputCounter.of(KLAUS.getUser(),"Müsch",2)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",1)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
+                                       InputCounter.of(KLAUS.getUser(),"s",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
+                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
+                                       InputCounter.of(PETER.getUser(),"Boäh",1)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",2)),
+                                       InputWindowedKey.of(PETER.getUser(),"Welt"),
+                                       InputCounter.of(PETER.getUser(),"Welt",2)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
+                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
+                                       InputCounter.of(PETER.getUser(),"Boäh",2)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",2)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
+                                       InputCounter.of(KLAUS.getUser(),"s",2)),
                        new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
+                                       InputWindowedKey.of(PETER.getUser(),"Boäh"),
+                                       InputCounter.of(PETER.getUser(),"Boäh",3)),
                        new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",3)),
+                                       InputWindowedKey.of(KLAUS.getUser(),"s"),
+                                       InputCounter.of(KLAUS.getUser(),"s",3)),
        };
 
        static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
@@ -71,15 +71,15 @@ class TestData
                                                                .containsExactlyElementsOf(rankings));
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       static void assertExpectedState(ReadOnlyKeyValueStore<WindowedKey, Ranking> store)
        {
                assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
                assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
        }
 
-       private static User userOf(TestUser user)
+       private static WindowedKey userOf(TestUser user)
        {
-               return User.of(user.getUser());
+               return WindowedKey.of(user.getUser());
        }
 
        static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
@@ -113,7 +113,7 @@ class TestData
                return Arrays
                                .stream(entries)
                                .map(entry -> TestEntry.of(
-                                               entry.getWord(),
+                                               entry.getKey(),
                                                entry.getCounter() == null
                                                                ? -1l
                                                                : entry.getCounter()))