popular: 1.0.0 - Renamed packages and classes -- MOVE
authorKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 19:24:20 +0000 (21:24 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 19:24:20 +0000 (21:24 +0200)
24 files changed:
src/main/java/de/juplo/kafka/wordcount/popular/Entry.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/Key.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/User.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/top10/Entry.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Key.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/User.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/popular/RankingTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Entry.java b/src/main/java/de/juplo/kafka/wordcount/popular/Entry.java
new file mode 100644 (file)
index 0000000..b25fc07
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka.wordcount.top10;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@NoArgsConstructor
+@AllArgsConstructor(
+    staticName = "of",
+    access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Entry
+{
+  private String word;
+  private Long counter;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Key.java b/src/main/java/de/juplo/kafka/wordcount/popular/Key.java
new file mode 100644 (file)
index 0000000..ffac8ea
--- /dev/null
@@ -0,0 +1,17 @@
+package de.juplo.kafka.wordcount.top10;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
+
+
+@NoArgsConstructor
+@AllArgsConstructor(
+    staticName = "of",
+    access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Key
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java
new file mode 100644 (file)
index 0000000..5c14ae7
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class Top10Application
+{
+       public static void main(String[] args)
+       {
+               SpringApplication.run(Top10Application.class, args);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguration.java
new file mode 100644 (file)
index 0000000..255f0e4
--- /dev/null
@@ -0,0 +1,98 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@Slf4j
+public class Top10ApplicationConfiguration
+{
+       @Bean
+       public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+       {
+               Properties props = new Properties();
+
+               props.putAll(serializationConfig());
+
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+
+               if (properties.getCommitInterval() != null)
+                       props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
+               if (properties.getCacheMaxBytes() != null)
+                       props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
+
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return props;
+       }
+
+       static Properties serializationConfig()
+       {
+               Properties props = new Properties();
+
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+               props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
+               props.put(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "word:" + Key.class.getName() + "," +
+                               "counter:" + Entry.class.getName() + "," +
+                               "user:" + User.class.getName() + "," +
+                               "ranking:" + Ranking.class.getName());
+
+               return props;
+       }
+
+       @Bean(initMethod = "start", destroyMethod = "stop")
+       public Top10StreamProcessor streamProcessor(
+                       Top10ApplicationProperties applicationProperties,
+                       Properties streamProcessorProperties,
+                       KeyValueBytesStoreSupplier storeSupplier,
+                       ConfigurableApplicationContext context)
+       {
+               Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+                               applicationProperties.getInputTopic(),
+                               applicationProperties.getOutputTopic(),
+                               streamProcessorProperties,
+                               storeSupplier);
+
+               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+               return streamProcessor;
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier storeSupplier()
+       {
+               return Stores.persistentKeyValueStore(STORE_NAME);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java
new file mode 100644 (file)
index 0000000..d3bb236
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.top10;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.top10")
+@Getter
+@Setter
+@ToString
+public class Top10ApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String applicationId = "top10";
+  private String inputTopic = "countings";
+  private String outputTopic = "top10";
+  private Integer commitInterval;
+  private Integer cacheMaxBytes;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
new file mode 100644 (file)
index 0000000..70ead87
--- /dev/null
@@ -0,0 +1,75 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Properties;
+
+
+@Slf4j
+public class Top10StreamProcessor
+{
+       public static final String STORE_NAME= "top10";
+
+       public final KafkaStreams streams;
+
+
+       public Top10StreamProcessor(
+                       String inputTopic,
+                       String outputTopic,
+                       Properties props,
+                       KeyValueBytesStoreSupplier storeSupplier)
+       {
+               Topology topology = Top10StreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic,
+                               storeSupplier);
+
+               streams = new KafkaStreams(topology, props);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier)
+       {
+               StreamsBuilder builder = new StreamsBuilder();
+
+               builder
+                               .<Key, Entry>stream(inputTopic)
+                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+                               .groupByKey()
+                               .aggregate(
+                                               () -> new Ranking(),
+                                               (user, entry, ranking) -> ranking.add(entry),
+                                               Materialized.as(storeSupplier))
+                               .toStream()
+                               .to(outputTopic);
+
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
+
+               return topology;
+       }
+
+       ReadOnlyKeyValueStore<User, Ranking> getStore()
+       {
+               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+       }
+
+       public void start()
+       {
+               log.info("Starting Stream-Processor");
+               streams.start();
+       }
+
+       public void stop()
+       {
+               log.info("Stopping Stream-Processor");
+               streams.close();
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/popular/Ranking.java
new file mode 100644 (file)
index 0000000..4f56c18
--- /dev/null
@@ -0,0 +1,159 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.*;
+
+import java.util.*;
+
+
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
+public class Ranking
+{
+  public final static int MAX_ENTRIES = 10;
+
+
+  private Entry[] entries = new Entry[0];
+
+  public Ranking add(Entry newEntry)
+  {
+    if (entries.length == 0)
+    {
+      entries = new Entry[1];
+      entries[0] = newEntry;
+      return this;
+    }
+
+    List<Entry> list = new LinkedList<>(Arrays.asList(entries));
+    int oldPosition = -1;
+    for (int i = 0; i < list.size(); i++)
+    {
+      Entry entry = list.get(i);
+
+      if (entry.getCounter() < newEntry.getCounter())
+      {
+        if (oldPosition > -1)
+        {
+          if (list.get(oldPosition).getCounter() > newEntry.getCounter())
+          {
+            throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+          }
+          else
+          {
+            // Entry for word already exists with the same counting! Nothing changed...
+            return this;
+          }
+        }
+
+        list.add(i, newEntry);
+        for (int j = i+1; j < list.size(); j++)
+        {
+          entry = list.get(j);
+          if(entry.getWord().equals(newEntry.getWord()))
+          {
+            list.remove(j);
+            break;
+          }
+        }
+        if (list.size() > MAX_ENTRIES)
+        {
+          list = list.subList(0, MAX_ENTRIES);
+        }
+        entries = list.toArray(num -> new Entry[num]);
+        return this;
+      }
+
+      if (entry.getWord().equals(newEntry.getWord()))
+        oldPosition = i;
+    }
+
+    if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
+    {
+      throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+    }
+
+    if (list.size() < MAX_ENTRIES)
+    {
+      list.add(newEntry);
+      entries = list.toArray(num -> new Entry[num]);
+    }
+
+    return this;
+  }
+
+  public Ranking validate() throws IllegalArgumentException
+  {
+    if (this.entries.length > MAX_ENTRIES)
+      throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
+
+    Set<String> seenWords = new HashSet<>();
+    long lowesCounting = Long.MAX_VALUE;
+
+    for (int i=0; i<this.entries.length; i++)
+    {
+      Entry entry = this.entries[i];
+
+      if (seenWords.contains(entry.getWord()))
+        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+      if (entry.getCounter() > lowesCounting)
+        throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
+
+      seenWords.add(entry.getWord());
+      lowesCounting = entry.getCounter();
+    }
+
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o)
+      return true;
+    if (o == null)
+      return false;
+    if (!(o instanceof Ranking))
+      return false;
+
+    Ranking other = (Ranking)o;
+
+    if (other.entries.length != entries.length)
+      return false;
+
+    if (entries.length == 0)
+      return true;
+
+    int i = 0;
+    Set<String> myWordsWithCurrentCount = new HashSet<>();
+    Set<String> otherWordsWithCurrentCount = new HashSet<>();
+    Entry myEntry = entries[i];
+    long currentCount = myEntry.getCounter();
+    myWordsWithCurrentCount.add(myEntry.getWord());
+    while (true)
+    {
+      Entry otherEntry = other.entries[i];
+      if (otherEntry.getCounter() != currentCount)
+        return false;
+      otherWordsWithCurrentCount.add(otherEntry.getWord());
+      if (++i >= entries.length)
+        return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
+      myEntry = entries[i];
+      if (myEntry.getCounter() != currentCount)
+      {
+        if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
+          return false;
+        currentCount = myEntry.getCounter();
+        myWordsWithCurrentCount.clear();
+        otherWordsWithCurrentCount.clear();
+      }
+      myWordsWithCurrentCount.add(myEntry.getWord());
+    }
+  }
+
+  public static Ranking of(Entry... entries)
+  {
+    Ranking ranking = new Ranking(entries);
+    ranking.validate();
+    return ranking;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/User.java b/src/main/java/de/juplo/kafka/wordcount/popular/User.java
new file mode 100644 (file)
index 0000000..53c258d
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+  String user;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java
deleted file mode 100644 (file)
index b25fc07..0000000
+++ /dev/null
@@ -1,20 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(
-    staticName = "of",
-    access = AccessLevel.PACKAGE)
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Entry
-{
-  private String word;
-  private Long counter;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java
deleted file mode 100644 (file)
index ffac8ea..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.*;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(
-    staticName = "of",
-    access = AccessLevel.PACKAGE)
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Key
-{
-  private String user;
-  private String word;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java
deleted file mode 100644 (file)
index 4f56c18..0000000
+++ /dev/null
@@ -1,159 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.*;
-
-import java.util.*;
-
-
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-@NoArgsConstructor
-@Data
-public class Ranking
-{
-  public final static int MAX_ENTRIES = 10;
-
-
-  private Entry[] entries = new Entry[0];
-
-  public Ranking add(Entry newEntry)
-  {
-    if (entries.length == 0)
-    {
-      entries = new Entry[1];
-      entries[0] = newEntry;
-      return this;
-    }
-
-    List<Entry> list = new LinkedList<>(Arrays.asList(entries));
-    int oldPosition = -1;
-    for (int i = 0; i < list.size(); i++)
-    {
-      Entry entry = list.get(i);
-
-      if (entry.getCounter() < newEntry.getCounter())
-      {
-        if (oldPosition > -1)
-        {
-          if (list.get(oldPosition).getCounter() > newEntry.getCounter())
-          {
-            throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
-          }
-          else
-          {
-            // Entry for word already exists with the same counting! Nothing changed...
-            return this;
-          }
-        }
-
-        list.add(i, newEntry);
-        for (int j = i+1; j < list.size(); j++)
-        {
-          entry = list.get(j);
-          if(entry.getWord().equals(newEntry.getWord()))
-          {
-            list.remove(j);
-            break;
-          }
-        }
-        if (list.size() > MAX_ENTRIES)
-        {
-          list = list.subList(0, MAX_ENTRIES);
-        }
-        entries = list.toArray(num -> new Entry[num]);
-        return this;
-      }
-
-      if (entry.getWord().equals(newEntry.getWord()))
-        oldPosition = i;
-    }
-
-    if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
-    {
-      throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
-    }
-
-    if (list.size() < MAX_ENTRIES)
-    {
-      list.add(newEntry);
-      entries = list.toArray(num -> new Entry[num]);
-    }
-
-    return this;
-  }
-
-  public Ranking validate() throws IllegalArgumentException
-  {
-    if (this.entries.length > MAX_ENTRIES)
-      throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
-
-    Set<String> seenWords = new HashSet<>();
-    long lowesCounting = Long.MAX_VALUE;
-
-    for (int i=0; i<this.entries.length; i++)
-    {
-      Entry entry = this.entries[i];
-
-      if (seenWords.contains(entry.getWord()))
-        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
-      if (entry.getCounter() > lowesCounting)
-        throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
-
-      seenWords.add(entry.getWord());
-      lowesCounting = entry.getCounter();
-    }
-
-    return this;
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o)
-      return true;
-    if (o == null)
-      return false;
-    if (!(o instanceof Ranking))
-      return false;
-
-    Ranking other = (Ranking)o;
-
-    if (other.entries.length != entries.length)
-      return false;
-
-    if (entries.length == 0)
-      return true;
-
-    int i = 0;
-    Set<String> myWordsWithCurrentCount = new HashSet<>();
-    Set<String> otherWordsWithCurrentCount = new HashSet<>();
-    Entry myEntry = entries[i];
-    long currentCount = myEntry.getCounter();
-    myWordsWithCurrentCount.add(myEntry.getWord());
-    while (true)
-    {
-      Entry otherEntry = other.entries[i];
-      if (otherEntry.getCounter() != currentCount)
-        return false;
-      otherWordsWithCurrentCount.add(otherEntry.getWord());
-      if (++i >= entries.length)
-        return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
-      myEntry = entries[i];
-      if (myEntry.getCounter() != currentCount)
-      {
-        if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
-          return false;
-        currentCount = myEntry.getCounter();
-        myWordsWithCurrentCount.clear();
-        otherWordsWithCurrentCount.clear();
-      }
-      myWordsWithCurrentCount.add(myEntry.getWord());
-    }
-  }
-
-  public static Ranking of(Entry... entries)
-  {
-    Ranking ranking = new Ranking(entries);
-    ranking.validate();
-    return ranking;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java
deleted file mode 100644 (file)
index 5c14ae7..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class Top10Application
-{
-       public static void main(String[] args)
-       {
-               SpringApplication.run(Top10Application.class, args);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
deleted file mode 100644 (file)
index 255f0e4..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
-@Slf4j
-public class Top10ApplicationConfiguration
-{
-       @Bean
-       public Properties streamProcessorProperties(Top10ApplicationProperties properties)
-       {
-               Properties props = new Properties();
-
-               props.putAll(serializationConfig());
-
-               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
-               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-
-               if (properties.getCommitInterval() != null)
-                       props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
-               if (properties.getCacheMaxBytes() != null)
-                       props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
-
-               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-               return props;
-       }
-
-       static Properties serializationConfig()
-       {
-               Properties props = new Properties();
-
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
-               props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
-               props.put(
-                               JsonDeserializer.TYPE_MAPPINGS,
-                               "word:" + Key.class.getName() + "," +
-                               "counter:" + Entry.class.getName() + "," +
-                               "user:" + User.class.getName() + "," +
-                               "ranking:" + Ranking.class.getName());
-
-               return props;
-       }
-
-       @Bean(initMethod = "start", destroyMethod = "stop")
-       public Top10StreamProcessor streamProcessor(
-                       Top10ApplicationProperties applicationProperties,
-                       Properties streamProcessorProperties,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ConfigurableApplicationContext context)
-       {
-               Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
-                               applicationProperties.getInputTopic(),
-                               applicationProperties.getOutputTopic(),
-                               streamProcessorProperties,
-                               storeSupplier);
-
-               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
-               {
-                       log.error("Unexpected error!", e);
-                       CompletableFuture.runAsync(() ->
-                       {
-                               log.info("Stopping application...");
-                               SpringApplication.exit(context, () -> 1);
-                       });
-                       return SHUTDOWN_CLIENT;
-               });
-
-               return streamProcessor;
-       }
-
-       @Bean
-       public KeyValueBytesStoreSupplier storeSupplier()
-       {
-               return Stores.persistentKeyValueStore(STORE_NAME);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java
deleted file mode 100644 (file)
index d3bb236..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.top10")
-@Getter
-@Setter
-@ToString
-public class Top10ApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String applicationId = "top10";
-  private String inputTopic = "countings";
-  private String outputTopic = "top10";
-  private Integer commitInterval;
-  private Integer cacheMaxBytes;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
deleted file mode 100644 (file)
index 70ead87..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-
-import java.util.Properties;
-
-
-@Slf4j
-public class Top10StreamProcessor
-{
-       public static final String STORE_NAME= "top10";
-
-       public final KafkaStreams streams;
-
-
-       public Top10StreamProcessor(
-                       String inputTopic,
-                       String outputTopic,
-                       Properties props,
-                       KeyValueBytesStoreSupplier storeSupplier)
-       {
-               Topology topology = Top10StreamProcessor.buildTopology(
-                               inputTopic,
-                               outputTopic,
-                               storeSupplier);
-
-               streams = new KafkaStreams(topology, props);
-       }
-
-       static Topology buildTopology(
-                       String inputTopic,
-                       String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
-       {
-               StreamsBuilder builder = new StreamsBuilder();
-
-               builder
-                               .<Key, Entry>stream(inputTopic)
-                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
-                               .groupByKey()
-                               .aggregate(
-                                               () -> new Ranking(),
-                                               (user, entry, ranking) -> ranking.add(entry),
-                                               Materialized.as(storeSupplier))
-                               .toStream()
-                               .to(outputTopic);
-
-               Topology topology = builder.build();
-               log.info("\n\n{}", topology.describe());
-
-               return topology;
-       }
-
-       ReadOnlyKeyValueStore<User, Ranking> getStore()
-       {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
-       }
-
-       public void start()
-       {
-               log.info("Starting Stream-Processor");
-               streams.start();
-       }
-
-       public void stop()
-       {
-               log.info("Stopping Stream-Processor");
-               streams.close();
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java
deleted file mode 100644 (file)
index 53c258d..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-@Data
-public class User
-{
-  String user;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
new file mode 100644 (file)
index 0000000..f5ef236
--- /dev/null
@@ -0,0 +1,168 @@
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+                               "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+                               "logging.level.root=WARN",
+                               "logging.level.de.juplo=DEBUG",
+                               "logging.level.org.apache.kafka.clients=INFO",
+                               "logging.level.org.apache.kafka.streams=INFO",
+                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.top10.commit-interval=100",
+                               "juplo.wordcount.top10.cacheMaxBytes=0",
+                               "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
+                               "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+@Slf4j
+public class Top10ApplicationIT
+{
+       public static final String TOPIC_IN = "in";
+       public static final String TOPIC_OUT = "out";
+
+       @Autowired
+       Consumer consumer;
+       @Autowired
+       Top10StreamProcessor streamProcessor;
+
+
+       @BeforeAll
+       public static void testSendMessage(
+                       @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+       {
+               TestData
+                               .getInputMessages()
+                               .forEach(kv ->
+                               {
+                                       try
+                                       {
+                                               SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               log.info(
+                                                               "Sent: {}={}, partition={}, offset={}",
+                                                               result.getProducerRecord().key(),
+                                                               result.getProducerRecord().value(),
+                                                               result.getRecordMetadata().partition(),
+                                                               result.getRecordMetadata().offset());
+                                       }
+                                       catch (Exception e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               });
+       }
+
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+       }
+
+       @DisplayName("Await the expected output messages")
+       @Test
+       @Disabled
+       public void testAwaitExpectedMessages()
+       {
+               await("Expected messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> consumer.enforceAssertion(
+                                               receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
+       }
+
+       @DisplayName("Await the expected number of messages")
+       @Test
+       public void testAwaitExpectedNumberOfMessagesForUsers()
+       {
+               await("Expected number of messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> consumer.enforceAssertion(
+                                               receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
+       }
+
+       @DisplayName("Await the expected final output messages")
+       @Test
+       public void testAwaitExpectedLastMessagesForUsers()
+       {
+               await("Expected final output messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> consumer.enforceAssertion(
+                                               receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
+       }
+
+
+       static class Consumer
+       {
+               private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public synchronized void receive(
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
+                               @Payload TestRanking ranking)
+               {
+                       log.debug("Received message: {} -> {}", user, ranking);
+                       received.add(user, ranking);
+               }
+
+               synchronized void enforceAssertion(
+                               java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
+               {
+                       assertion.accept(received);
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+               }
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..90d8e4c
--- /dev/null
@@ -0,0 +1,105 @@
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+
+import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<TestWord, TestCounter> in;
+  TestOutputTopic<TestUser, TestRanking> out;
+
+
+  @BeforeEach
+  public void setUp()
+  {
+    Topology topology = Top10StreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore(STORE_NAME));
+
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+    in = testDriver.createInputTopic(
+        IN,
+        jsonSerializer(TestWord.class, true),
+        jsonSerializer(TestCounter.class,false));
+
+    out = testDriver.createOutputTopic(
+        OUT,
+        new JsonDeserializer()
+            .copyWithType(TestUser.class)
+            .ignoreTypeHeaders(),
+        new JsonDeserializer()
+            .copyWithType(TestRanking.class)
+            .ignoreTypeHeaders());
+
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getInputMessages()
+        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+    MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
+    out
+        .readRecordsToList()
+        .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+    TestData.assertExpectedMessages(receivedMessages);
+
+    TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
+    TestData.assertExpectedLastMessagesForUsers(receivedMessages);
+
+    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+  {
+    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+    jsonSerializer.configure(
+        Map.of(
+            JsonSerializer.TYPE_MAPPINGS,
+            "word:" + TestWord.class.getName() + "," +
+            "counter:" + TestCounter.class.getName()),
+        isKey);
+    return jsonSerializer;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/RankingTest.java
new file mode 100644 (file)
index 0000000..26749e9
--- /dev/null
@@ -0,0 +1,276 @@
+package de.juplo.kafka.wordcount.top10;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+
+
+public class RankingTest
+{
+  @DisplayName("A newly created instance is empty")
+  @Test
+  public void testNewRankingIsEmpty()
+  {
+    Ranking ranking = new Ranking();
+    assertThat(ranking.getEntries()).isEmpty();
+  }
+
+  @DisplayName("An instance that was build from an empty ranking is empty")
+  @Test
+  public void testRankingOfYieldsExpectedResultForEmptyList()
+  {
+    Ranking ranking = new Ranking();
+    assertThat(ranking.getEntries()).isEmpty();
+  }
+
+  @DisplayName("An instance that was build from a valid ranking contains the expected entries")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testRankingOfYieldsExpectedResultsForValidRankings(List<Entry> entryList)
+  {
+    Ranking ranking = Ranking.of(toArray(entryList));
+    assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList);
+  }
+
+  @DisplayName("The builder fails for invalid rankings")
+  @ParameterizedTest
+  @MethodSource("invalidRankingsProvider")
+  public void testRankingOfThrowsExceptionForInvalidRankings(List<Entry> entryList)
+  {
+    assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> Ranking.of(toArray(entryList)));
+  }
+
+  @DisplayName("Adding a new word with highest ranking, pushes all other words down")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testAddingNewWordWithHighestRanking(List<Entry> entryList)
+  {
+    Ranking ranking = Ranking.of(toArray(entryList));
+    Entry newEntry = Entry.of("NEW!", rankingForPosition(-1));
+    ranking.add(newEntry);
+    assertThat(ranking.getEntries()[0]).isEqualTo(newEntry);
+    for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+    {
+      assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+    }
+  }
+
+  @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testAddingNewWordWithExistingRanking(List<Entry> entryList)
+  {
+    for (int position = 0; position < entryList.size(); position++ )
+    {
+      Ranking ranking = Ranking.of(toArray(entryList));
+      Entry newEntry = Entry.of("NEW!", rankingForPosition(position));
+      ranking.add(newEntry);
+      for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+      {
+        if (i < position)
+        {
+          assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+        }
+        if (i == position)
+        {
+          assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+          assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry);
+        }
+        if (i > position)
+        {
+          assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+        }
+      }
+    }
+  }
+
+  @DisplayName("Adding a highest ranking for an existing word shifts it to the first place")
+  @ParameterizedTest
+  @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+  public void testAddingExistingWordWithHighestRanking(int position)
+  {
+    Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+    String word = wordForPosition(position);
+    Entry highestEntry = Entry.of(word, 100l);
+    ranking.add(highestEntry);
+    List<Entry> expectedEntries = Stream
+        .concat(
+            Stream.of(highestEntry),
+            VALID_RANKINGS[0]
+                .stream()
+                .filter(entry -> !entry.getWord().equals(word)))
+        .toList();
+    assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
+  }
+
+  @DisplayName("Adding an existing word with unchanged ranking changes nothing")
+  @ParameterizedTest
+  @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+  public void testAddingExistingWordWithUnchangedRanking(int position)
+  {
+    Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+    Entry unchangedEntry = Entry.of(
+        wordForPosition(position),
+        rankingForPosition(position));
+    ranking.add(unchangedEntry);
+    assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]);
+  }
+
+  @DisplayName("Adding an existing word with a lower ranking fails")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testAddingExistingWordWithLowerRankingFails(List<Entry> entryList)
+  {
+    Ranking ranking = Ranking.of(toArray(entryList));
+    entryList.forEach(entry ->
+      assertThatExceptionOfType(IllegalArgumentException.class)
+          .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+  }
+
+  @DisplayName("Identical rankings are considered equal")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testIdenticalRankingsAreConsideredEaqual(List<Entry> entryList)
+  {
+    assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList)));
+  }
+
+  @DisplayName("Two empty rankings are considered equal")
+  @Test
+  public void testTwoEmptyRankingsAreConsideredEaqual()
+  {
+    assertThat(Ranking.of()).isEqualTo(Ranking.of());
+  }
+
+  @DisplayName("A changed ranking is not considered equal to its unchanged counter-part")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testChangedRankingsDoNotEaqualUnchangedOne(List<Entry> entryList)
+  {
+    Ranking changed = Ranking.of(toArray(entryList));
+    changed.add(Entry.of("devilish", 666l));
+    assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList)));
+  }
+
+  @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ")
+  @Test
+  public void testRankingWithDifferentOrderForSameCountAreEqual()
+  {
+    assertThat(
+        Ranking.of(
+            Entry.of("a1",10l),
+            Entry.of("a2",10l),
+            Entry.of("b1", 9l),
+            Entry.of("b2",9l),
+            Entry.of("c1", 8l),
+            Entry.of("c2", 8l)))
+        .isEqualTo(Ranking.of(
+            Entry.of("a2",10l),
+            Entry.of("a1",10l),
+            Entry.of("b2", 9l),
+            Entry.of("b1",9l),
+            Entry.of("c2", 8l),
+            Entry.of("c1", 8l)));
+  }
+
+
+  Entry[] toArray(List<Entry> entryList)
+  {
+    return entryList.toArray(size -> new Entry[size]);
+  }
+
+  static String wordForPosition(int position)
+  {
+    return Integer.toString(position+1);
+  }
+
+  static long rankingForPosition(int position)
+  {
+    return (long)Ranking.MAX_ENTRIES * 2 - position;
+  }
+
+  static Stream<List<Entry>> validRankingsProvider()
+  {
+    return Stream.of(VALID_RANKINGS);
+  }
+
+  static Stream<List<Entry>> invalidRankingsProvider()
+  {
+    return Stream.of(INVALID_RANKINGS);
+  }
+
+  static String[] WORDS = new String[Ranking.MAX_ENTRIES];
+  static List<Entry>[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES];
+
+  static
+  {
+    for (int i = 0; i < Ranking.MAX_ENTRIES; i++)
+    {
+      List<Entry> ranking = new LinkedList<>();
+      String word = null;
+      for (int position = 0; position <= i; position++)
+      {
+        word = wordForPosition(position);
+        Entry entry = Entry.of(word, rankingForPosition(position));
+        ranking.add(entry);
+      }
+      WORDS[i] = word;
+      VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking;
+    }
+  }
+
+  static List<Entry>[] INVALID_RANKINGS = new List[] {
+      List.of(
+          Entry.of("Platz eins", 1l),
+          Entry.of("Platz zwei", 2l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz eins", 1l)),
+      List.of(
+          Entry.of("Platz eins", 11l),
+          Entry.of("Platz eins", 1l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz eins", 11111111l),
+          Entry.of("Platz zwei", 2222222l),
+          Entry.of("Platz fünf", 555555l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz drei", 33333333l),
+          Entry.of("Platz vier", 4444444l),
+          Entry.of("Platz eins", 111111l),
+          Entry.of("Platz sechs", 66666l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz drei", 33333333l),
+          Entry.of("Platz vier", 4444444l),
+          Entry.of("Platz fünf", 555555l),
+          Entry.of("Platz sechs", 66666l),
+          Entry.of("Platz eins", 1l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz drei", 33333333l),
+          Entry.of("Platz vier", 4444444l),
+          Entry.of("Platz fünf", 555555l),
+          Entry.of("Platz sechs", 66666l),
+          Entry.of("Platz sieben", 7777l),
+          Entry.of("Platz acht", 888l),
+          Entry.of("Platz neun", 99l),
+          Entry.of("Platz 10", 6l),
+          Entry.of("Platz 11", 3l))};
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
new file mode 100644 (file)
index 0000000..7a3a27e
--- /dev/null
@@ -0,0 +1,209 @@
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestEntry;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+       static final TestUser PETER = TestUser.of("peter");
+       static final TestUser KLAUS = TestUser.of("klaus");
+
+       static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+       {
+               return Stream.of(INPUT_MESSAGES);
+       }
+
+       private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+       {
+                       new KeyValue<>(
+                                       TestWord.of(PETER.getUser(),"Hallo"),
+                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
+                       new KeyValue<>(
+                                       TestWord.of(KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+                       new KeyValue<>(
+                                       TestWord.of(PETER.getUser(),"Welt"),
+                                       TestCounter.of(PETER.getUser(),"Welt",1)),
+                       new KeyValue<>(
+                                       TestWord.of(KLAUS.getUser(),"Müsch"),
+                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+                       new KeyValue<>(
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",1)),
+                       new KeyValue<>(
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
+                       new KeyValue<>(
+                                       TestWord.of(PETER.getUser(),"Welt"),
+                                       TestCounter.of(PETER.getUser(),"Welt",2)),
+                       new KeyValue<>(
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
+                       new KeyValue<>(
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",2)),
+                       new KeyValue<>(
+                                       TestWord.of(PETER.getUser(),"Boäh"),
+                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
+                       new KeyValue<>(
+                                       TestWord.of(KLAUS.getUser(),"s"),
+                                       TestCounter.of(KLAUS.getUser(),"s",3)),
+       };
+
+       static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       {
+               expectedMessages().forEach(
+                               (user, rankings) ->
+                                               assertThat(receivedMessages.get(user))
+                                                               .containsExactlyElementsOf(rankings));
+       }
+
+       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
+       }
+
+       private static User userOf(TestUser user)
+       {
+               return User.of(user.getUser());
+       }
+
+       static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       {
+               assertThat(countMessagesForUser(PETER, receivedMessages));
+               assertThat(countMessagesForUser(KLAUS, receivedMessages));
+       }
+
+       private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+       {
+               return messagesForUsers.get(user) == null
+                               ? 0
+                               : messagesForUsers.get(user).size();
+       }
+
+
+       static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+       {
+               TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
+               assertRankingEqualsRankingFromLastMessage(user, testRanking);
+       }
+
+       private static TestEntry[] testEntriesOf(Entry... entries)
+       {
+               return Arrays
+                               .stream(entries)
+                               .map(entry -> TestEntry.of(
+                                               entry.getWord(),
+                                               entry.getCounter() == null
+                                                               ? -1l
+                                                               : entry.getCounter()))
+                               .toArray(size -> new TestEntry[size]);
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
+       {
+               assertThat(ranking).isEqualTo(getLastMessageFor(user));
+       }
+
+       private static TestRanking getLastMessageFor(TestUser user)
+       {
+               return getLastMessageFor(user, expectedMessages());
+       }
+
+       private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+       {
+               return messagesForUsers
+                               .get(user)
+                               .stream()
+                               .reduce(null, (left, right) -> right);
+       }
+
+       private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair( // 0
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 1
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 1l))),
+                       KeyValue.pair( // 2
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l))),
+                       KeyValue.pair( // 3
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l))),
+                       KeyValue.pair( // 4
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 1l))),
+                       KeyValue.pair( // 5
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 6
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 7
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Boäh", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 8
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 2l))),
+                       KeyValue.pair( // 9
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Boäh", 3l),
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 10
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("s", 3l),
+                                                       TestEntry.of("Müsch", 2l))),
+       };
+
+       private static MultiValueMap<TestUser, TestRanking> expectedMessages()
+       {
+               MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
+               Stream
+                               .of(EXPECTED_MESSAGES)
+                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+               return expectedMessages;
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java
deleted file mode 100644 (file)
index 26749e9..0000000
+++ /dev/null
@@ -1,276 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
-
-
-public class RankingTest
-{
-  @DisplayName("A newly created instance is empty")
-  @Test
-  public void testNewRankingIsEmpty()
-  {
-    Ranking ranking = new Ranking();
-    assertThat(ranking.getEntries()).isEmpty();
-  }
-
-  @DisplayName("An instance that was build from an empty ranking is empty")
-  @Test
-  public void testRankingOfYieldsExpectedResultForEmptyList()
-  {
-    Ranking ranking = new Ranking();
-    assertThat(ranking.getEntries()).isEmpty();
-  }
-
-  @DisplayName("An instance that was build from a valid ranking contains the expected entries")
-  @ParameterizedTest
-  @MethodSource("validRankingsProvider")
-  public void testRankingOfYieldsExpectedResultsForValidRankings(List<Entry> entryList)
-  {
-    Ranking ranking = Ranking.of(toArray(entryList));
-    assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList);
-  }
-
-  @DisplayName("The builder fails for invalid rankings")
-  @ParameterizedTest
-  @MethodSource("invalidRankingsProvider")
-  public void testRankingOfThrowsExceptionForInvalidRankings(List<Entry> entryList)
-  {
-    assertThatExceptionOfType(IllegalArgumentException.class)
-        .isThrownBy(() -> Ranking.of(toArray(entryList)));
-  }
-
-  @DisplayName("Adding a new word with highest ranking, pushes all other words down")
-  @ParameterizedTest
-  @MethodSource("validRankingsProvider")
-  public void testAddingNewWordWithHighestRanking(List<Entry> entryList)
-  {
-    Ranking ranking = Ranking.of(toArray(entryList));
-    Entry newEntry = Entry.of("NEW!", rankingForPosition(-1));
-    ranking.add(newEntry);
-    assertThat(ranking.getEntries()[0]).isEqualTo(newEntry);
-    for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
-    {
-      assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
-    }
-  }
-
-  @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down")
-  @ParameterizedTest
-  @MethodSource("validRankingsProvider")
-  public void testAddingNewWordWithExistingRanking(List<Entry> entryList)
-  {
-    for (int position = 0; position < entryList.size(); position++ )
-    {
-      Ranking ranking = Ranking.of(toArray(entryList));
-      Entry newEntry = Entry.of("NEW!", rankingForPosition(position));
-      ranking.add(newEntry);
-      for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
-      {
-        if (i < position)
-        {
-          assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
-        }
-        if (i == position)
-        {
-          assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
-          assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry);
-        }
-        if (i > position)
-        {
-          assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
-        }
-      }
-    }
-  }
-
-  @DisplayName("Adding a highest ranking for an existing word shifts it to the first place")
-  @ParameterizedTest
-  @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
-  public void testAddingExistingWordWithHighestRanking(int position)
-  {
-    Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
-    String word = wordForPosition(position);
-    Entry highestEntry = Entry.of(word, 100l);
-    ranking.add(highestEntry);
-    List<Entry> expectedEntries = Stream
-        .concat(
-            Stream.of(highestEntry),
-            VALID_RANKINGS[0]
-                .stream()
-                .filter(entry -> !entry.getWord().equals(word)))
-        .toList();
-    assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
-  }
-
-  @DisplayName("Adding an existing word with unchanged ranking changes nothing")
-  @ParameterizedTest
-  @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
-  public void testAddingExistingWordWithUnchangedRanking(int position)
-  {
-    Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
-    Entry unchangedEntry = Entry.of(
-        wordForPosition(position),
-        rankingForPosition(position));
-    ranking.add(unchangedEntry);
-    assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]);
-  }
-
-  @DisplayName("Adding an existing word with a lower ranking fails")
-  @ParameterizedTest
-  @MethodSource("validRankingsProvider")
-  public void testAddingExistingWordWithLowerRankingFails(List<Entry> entryList)
-  {
-    Ranking ranking = Ranking.of(toArray(entryList));
-    entryList.forEach(entry ->
-      assertThatExceptionOfType(IllegalArgumentException.class)
-          .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
-  }
-
-  @DisplayName("Identical rankings are considered equal")
-  @ParameterizedTest
-  @MethodSource("validRankingsProvider")
-  public void testIdenticalRankingsAreConsideredEaqual(List<Entry> entryList)
-  {
-    assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList)));
-  }
-
-  @DisplayName("Two empty rankings are considered equal")
-  @Test
-  public void testTwoEmptyRankingsAreConsideredEaqual()
-  {
-    assertThat(Ranking.of()).isEqualTo(Ranking.of());
-  }
-
-  @DisplayName("A changed ranking is not considered equal to its unchanged counter-part")
-  @ParameterizedTest
-  @MethodSource("validRankingsProvider")
-  public void testChangedRankingsDoNotEaqualUnchangedOne(List<Entry> entryList)
-  {
-    Ranking changed = Ranking.of(toArray(entryList));
-    changed.add(Entry.of("devilish", 666l));
-    assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList)));
-  }
-
-  @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ")
-  @Test
-  public void testRankingWithDifferentOrderForSameCountAreEqual()
-  {
-    assertThat(
-        Ranking.of(
-            Entry.of("a1",10l),
-            Entry.of("a2",10l),
-            Entry.of("b1", 9l),
-            Entry.of("b2",9l),
-            Entry.of("c1", 8l),
-            Entry.of("c2", 8l)))
-        .isEqualTo(Ranking.of(
-            Entry.of("a2",10l),
-            Entry.of("a1",10l),
-            Entry.of("b2", 9l),
-            Entry.of("b1",9l),
-            Entry.of("c2", 8l),
-            Entry.of("c1", 8l)));
-  }
-
-
-  Entry[] toArray(List<Entry> entryList)
-  {
-    return entryList.toArray(size -> new Entry[size]);
-  }
-
-  static String wordForPosition(int position)
-  {
-    return Integer.toString(position+1);
-  }
-
-  static long rankingForPosition(int position)
-  {
-    return (long)Ranking.MAX_ENTRIES * 2 - position;
-  }
-
-  static Stream<List<Entry>> validRankingsProvider()
-  {
-    return Stream.of(VALID_RANKINGS);
-  }
-
-  static Stream<List<Entry>> invalidRankingsProvider()
-  {
-    return Stream.of(INVALID_RANKINGS);
-  }
-
-  static String[] WORDS = new String[Ranking.MAX_ENTRIES];
-  static List<Entry>[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES];
-
-  static
-  {
-    for (int i = 0; i < Ranking.MAX_ENTRIES; i++)
-    {
-      List<Entry> ranking = new LinkedList<>();
-      String word = null;
-      for (int position = 0; position <= i; position++)
-      {
-        word = wordForPosition(position);
-        Entry entry = Entry.of(word, rankingForPosition(position));
-        ranking.add(entry);
-      }
-      WORDS[i] = word;
-      VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking;
-    }
-  }
-
-  static List<Entry>[] INVALID_RANKINGS = new List[] {
-      List.of(
-          Entry.of("Platz eins", 1l),
-          Entry.of("Platz zwei", 2l)),
-      List.of(
-          Entry.of("Platz eins", 1111111111l),
-          Entry.of("Platz zwei", 222222222l),
-          Entry.of("Platz eins", 1l)),
-      List.of(
-          Entry.of("Platz eins", 11l),
-          Entry.of("Platz eins", 1l)),
-      List.of(
-          Entry.of("Platz eins", 1111111111l),
-          Entry.of("Platz zwei", 222222222l),
-          Entry.of("Platz eins", 11111111l),
-          Entry.of("Platz zwei", 2222222l),
-          Entry.of("Platz fünf", 555555l)),
-      List.of(
-          Entry.of("Platz eins", 1111111111l),
-          Entry.of("Platz zwei", 222222222l),
-          Entry.of("Platz drei", 33333333l),
-          Entry.of("Platz vier", 4444444l),
-          Entry.of("Platz eins", 111111l),
-          Entry.of("Platz sechs", 66666l)),
-      List.of(
-          Entry.of("Platz eins", 1111111111l),
-          Entry.of("Platz zwei", 222222222l),
-          Entry.of("Platz drei", 33333333l),
-          Entry.of("Platz vier", 4444444l),
-          Entry.of("Platz fünf", 555555l),
-          Entry.of("Platz sechs", 66666l),
-          Entry.of("Platz eins", 1l)),
-      List.of(
-          Entry.of("Platz eins", 1111111111l),
-          Entry.of("Platz zwei", 222222222l),
-          Entry.of("Platz drei", 33333333l),
-          Entry.of("Platz vier", 4444444l),
-          Entry.of("Platz fünf", 555555l),
-          Entry.of("Platz sechs", 66666l),
-          Entry.of("Platz sieben", 7777l),
-          Entry.of("Platz acht", 888l),
-          Entry.of("Platz neun", 99l),
-          Entry.of("Platz 10", 6l),
-          Entry.of("Platz 11", 3l))};
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
deleted file mode 100644 (file)
index 7a3a27e..0000000
+++ /dev/null
@@ -1,209 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestEntry;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Arrays;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
-       static final TestUser PETER = TestUser.of("peter");
-       static final TestUser KLAUS = TestUser.of("klaus");
-
-       static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
-       {
-               return Stream.of(INPUT_MESSAGES);
-       }
-
-       private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
-       {
-                       new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Hallo"),
-                                       TestCounter.of(PETER.getUser(),"Hallo",1)),
-                       new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",1)),
-                       new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",1)),
-                       new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"Müsch"),
-                                       TestCounter.of(KLAUS.getUser(),"Müsch",2)),
-                       new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",1)),
-                       new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",1)),
-                       new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Welt"),
-                                       TestCounter.of(PETER.getUser(),"Welt",2)),
-                       new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",2)),
-                       new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",2)),
-                       new KeyValue<>(
-                                       TestWord.of(PETER.getUser(),"Boäh"),
-                                       TestCounter.of(PETER.getUser(),"Boäh",3)),
-                       new KeyValue<>(
-                                       TestWord.of(KLAUS.getUser(),"s"),
-                                       TestCounter.of(KLAUS.getUser(),"s",3)),
-       };
-
-       static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
-       {
-               expectedMessages().forEach(
-                               (user, rankings) ->
-                                               assertThat(receivedMessages.get(user))
-                                                               .containsExactlyElementsOf(rankings));
-       }
-
-       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
-       {
-               assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
-               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
-       }
-
-       private static User userOf(TestUser user)
-       {
-               return User.of(user.getUser());
-       }
-
-       static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
-       {
-               assertThat(countMessagesForUser(PETER, receivedMessages));
-               assertThat(countMessagesForUser(KLAUS, receivedMessages));
-       }
-
-       private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
-       {
-               return messagesForUsers.get(user) == null
-                               ? 0
-                               : messagesForUsers.get(user).size();
-       }
-
-
-       static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
-       {
-               assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
-               assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
-       }
-
-       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
-       {
-               TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
-               assertRankingEqualsRankingFromLastMessage(user, testRanking);
-       }
-
-       private static TestEntry[] testEntriesOf(Entry... entries)
-       {
-               return Arrays
-                               .stream(entries)
-                               .map(entry -> TestEntry.of(
-                                               entry.getWord(),
-                                               entry.getCounter() == null
-                                                               ? -1l
-                                                               : entry.getCounter()))
-                               .toArray(size -> new TestEntry[size]);
-       }
-
-       private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
-       {
-               assertThat(ranking).isEqualTo(getLastMessageFor(user));
-       }
-
-       private static TestRanking getLastMessageFor(TestUser user)
-       {
-               return getLastMessageFor(user, expectedMessages());
-       }
-
-       private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
-       {
-               return messagesForUsers
-                               .get(user)
-                               .stream()
-                               .reduce(null, (left, right) -> right);
-       }
-
-       private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
-       {
-                       KeyValue.pair( // 0
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l))),
-                       KeyValue.pair( // 1
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 1l))),
-                       KeyValue.pair( // 2
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Welt", 1l))),
-                       KeyValue.pair( // 3
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l))),
-                       KeyValue.pair( // 4
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l),
-                                                       TestEntry.of("s", 1l))),
-                       KeyValue.pair( // 5
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Welt", 1l),
-                                                       TestEntry.of("Boäh", 1l))),
-                       KeyValue.pair( // 6
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Hallo", 1l),
-                                                       TestEntry.of("Boäh", 1l))),
-                       KeyValue.pair( // 7
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Boäh", 2l),
-                                                       TestEntry.of("Hallo", 1l))),
-                       KeyValue.pair( // 8
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("Müsch", 2l),
-                                                       TestEntry.of("s", 2l))),
-                       KeyValue.pair( // 9
-                                       PETER,
-                                       TestRanking.of(
-                                                       TestEntry.of("Boäh", 3l),
-                                                       TestEntry.of("Welt", 2l),
-                                                       TestEntry.of("Hallo", 1l))),
-                       KeyValue.pair( // 10
-                                       KLAUS,
-                                       TestRanking.of(
-                                                       TestEntry.of("s", 3l),
-                                                       TestEntry.of("Müsch", 2l))),
-       };
-
-       private static MultiValueMap<TestUser, TestRanking> expectedMessages()
-       {
-               MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
-               Stream
-                               .of(EXPECTED_MESSAGES)
-                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
-               return expectedMessages;
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
deleted file mode 100644 (file)
index f5ef236..0000000
+++ /dev/null
@@ -1,168 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
-               properties = {
-                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
-                               "spring.kafka.consumer.auto-offset-reset=earliest",
-                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
-                               "logging.level.root=WARN",
-                               "logging.level.de.juplo=DEBUG",
-                               "logging.level.org.apache.kafka.clients=INFO",
-                               "logging.level.org.apache.kafka.streams=INFO",
-                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.top10.commit-interval=100",
-                               "juplo.wordcount.top10.cacheMaxBytes=0",
-                               "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
-                               "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
-@Slf4j
-public class Top10ApplicationIT
-{
-       public static final String TOPIC_IN = "in";
-       public static final String TOPIC_OUT = "out";
-
-       @Autowired
-       Consumer consumer;
-       @Autowired
-       Top10StreamProcessor streamProcessor;
-
-
-       @BeforeAll
-       public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
-       {
-               TestData
-                               .getInputMessages()
-                               .forEach(kv ->
-                               {
-                                       try
-                                       {
-                                               SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
-                                               log.info(
-                                                               "Sent: {}={}, partition={}, offset={}",
-                                                               result.getProducerRecord().key(),
-                                                               result.getProducerRecord().value(),
-                                                               result.getRecordMetadata().partition(),
-                                                               result.getRecordMetadata().offset());
-                                       }
-                                       catch (Exception e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               });
-       }
-
-       @DisplayName("Await the expected state in the state-store")
-       @Test
-       public void testAwaitExpectedState()
-       {
-               await("Expected state")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
-       }
-
-       @DisplayName("Await the expected output messages")
-       @Test
-       @Disabled
-       public void testAwaitExpectedMessages()
-       {
-               await("Expected messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected number of messages")
-       @Test
-       public void testAwaitExpectedNumberOfMessagesForUsers()
-       {
-               await("Expected number of messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected final output messages")
-       @Test
-       public void testAwaitExpectedLastMessagesForUsers()
-       {
-               await("Expected final output messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
-       }
-
-
-       static class Consumer
-       {
-               private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
-
-               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
-                               @Payload TestRanking ranking)
-               {
-                       log.debug("Received message: {} -> {}", user, ranking);
-                       received.add(user, ranking);
-               }
-
-               synchronized void enforceAssertion(
-                               java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
-               {
-                       assertion.accept(received);
-               }
-       }
-
-       @TestConfiguration
-       static class Configuration
-       {
-               @Bean
-               Consumer consumer()
-               {
-                       return new Consumer();
-               }
-
-               @Primary
-               @Bean
-               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
-               {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
deleted file mode 100644 (file)
index 90d8e4c..0000000
+++ /dev/null
@@ -1,105 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
-
-
-@Slf4j
-public class Top10StreamProcessorTopologyTest
-{
-  public static final String IN = "TEST-IN";
-  public static final String OUT = "TEST-OUT";
-  public static final String STORE_NAME = "TOPOLOGY-TEST";
-
-
-  TopologyTestDriver testDriver;
-  TestInputTopic<TestWord, TestCounter> in;
-  TestOutputTopic<TestUser, TestRanking> out;
-
-
-  @BeforeEach
-  public void setUp()
-  {
-    Topology topology = Top10StreamProcessor.buildTopology(
-        IN,
-        OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
-
-    testDriver = new TopologyTestDriver(topology, serializationConfig());
-
-    in = testDriver.createInputTopic(
-        IN,
-        jsonSerializer(TestWord.class, true),
-        jsonSerializer(TestCounter.class,false));
-
-    out = testDriver.createOutputTopic(
-        OUT,
-        new JsonDeserializer()
-            .copyWithType(TestUser.class)
-            .ignoreTypeHeaders(),
-        new JsonDeserializer()
-            .copyWithType(TestRanking.class)
-            .ignoreTypeHeaders());
-
-  }
-
-
-  @Test
-  public void test()
-  {
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
-    MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
-    out
-        .readRecordsToList()
-        .forEach(record -> receivedMessages.add(record.key(), record.value()));
-
-    TestData.assertExpectedMessages(receivedMessages);
-
-    TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
-    TestData.assertExpectedLastMessagesForUsers(receivedMessages);
-
-    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
-    TestData.assertExpectedState(store);
-  }
-
-  @AfterEach
-  public void tearDown()
-  {
-    testDriver.close();
-  }
-
-  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
-  {
-    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
-    jsonSerializer.configure(
-        Map.of(
-            JsonSerializer.TYPE_MAPPINGS,
-            "word:" + TestWord.class.getName() + "," +
-            "counter:" + TestCounter.class.getName()),
-        isKey);
-    return jsonSerializer;
-  }
-}