TEST-STORE:GREEN top10
authorKai Moritz <kai@juplo.de>
Thu, 30 May 2024 09:01:27 +0000 (11:01 +0200)
committerKai Moritz <kai@juplo.de>
Thu, 30 May 2024 10:18:15 +0000 (12:18 +0200)
18 files changed:
Dockerfile
pom.xml
src/main/java/de/juplo/kafka/wordcount/top10/Entry.java
src/main/java/de/juplo/kafka/wordcount/top10/Key.java
src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/top10/User.java [new file with mode: 0644]
src/main/resources/application.properties
src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/query/RankingData.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java [new file with mode: 0644]

index bbd15ef..16a12e3 100644 (file)
@@ -1,5 +1,5 @@
-FROM openjdk:11-jre-slim
+FROM eclipse-temurin:17-jre
 COPY target/*.jar /opt/app.jar
 COPY target/*.jar /opt/app.jar
-EXPOSE 8080
+EXPOSE 8084
 ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
 CMD []
 ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
 CMD []
diff --git a/pom.xml b/pom.xml
index 9bda638..fd71ccd 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -5,18 +5,16 @@
        <parent>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
        <parent>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
-               <version>2.5.4</version>
+               <version>3.2.5</version>
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>top10</artifactId>
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
        <artifactId>top10</artifactId>
-       <version>1.0.0</version>
+       <version>1.2.0</version>
        <name>Wordcount-Top-10</name>
        <description>Top-10 stream-processor of the multi-user wordcount-example</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
        <name>Wordcount-Top-10</name>
        <description>Top-10 stream-processor of the multi-user wordcount-example</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
-               <java.version>11</java.version>
-               <kafka.version>2.8.0</kafka.version>
        </properties>
        <dependencies>
                <dependency>
        </properties>
        <dependencies>
                <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-streams</artifactId>
                </dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-streams</artifactId>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka</artifactId>
+               </dependency>
 
                <dependency>
                        <groupId>org.springframework.boot</groupId>
 
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>lombok</artifactId>
                        <optional>true</optional>
                </dependency>
                        <artifactId>lombok</artifactId>
                        <optional>true</optional>
                </dependency>
+
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.awaitility</groupId>
+                       <artifactId>awaitility</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
                <plugins>
        </dependencies>
 
        <build>
                <plugins>
+                       <plugin>
+                               <artifactId>maven-failsafe-plugin</artifactId>
+                       </plugin>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
                        <plugin>
                                <groupId>org.springframework.boot</groupId>
                                <artifactId>spring-boot-maven-plugin</artifactId>
index 67f45f2..b25fc07 100644 (file)
@@ -1,11 +1,20 @@
 package de.juplo.kafka.wordcount.top10;
 
 package de.juplo.kafka.wordcount.top10;
 
-import lombok.Value;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
 
 
-@Value(staticConstructor = "of")
+@NoArgsConstructor
+@AllArgsConstructor(
+    staticName = "of",
+    access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Entry
 {
 public class Entry
 {
-  private final String word;
-  private final Long count;
+  private String word;
+  private Long counter;
 }
 }
index d09dbcc..ffac8ea 100644 (file)
@@ -1,13 +1,17 @@
 package de.juplo.kafka.wordcount.top10;
 
 package de.juplo.kafka.wordcount.top10;
 
-import lombok.Getter;
-import lombok.Setter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
 
 
 
 
-@Getter
-@Setter
+@NoArgsConstructor
+@AllArgsConstructor(
+    staticName = "of",
+    access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Key
 {
 public class Key
 {
-  private String username;
+  private String user;
   private String word;
 }
   private String word;
 }
index b748fe5..4f56c18 100644 (file)
@@ -1,36 +1,50 @@
 package de.juplo.kafka.wordcount.top10;
 
 package de.juplo.kafka.wordcount.top10;
 
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
 
 
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
 
 
 
 
-@Getter
-@Setter
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
 public class Ranking
 {
 public class Ranking
 {
+  public final static int MAX_ENTRIES = 10;
+
+
   private Entry[] entries = new Entry[0];
 
   private Entry[] entries = new Entry[0];
 
-  public void add(Entry newEntry)
+  public Ranking add(Entry newEntry)
   {
     if (entries.length == 0)
     {
       entries = new Entry[1];
       entries[0] = newEntry;
   {
     if (entries.length == 0)
     {
       entries = new Entry[1];
       entries[0] = newEntry;
-      return;
+      return this;
     }
 
     List<Entry> list = new LinkedList<>(Arrays.asList(entries));
     }
 
     List<Entry> list = new LinkedList<>(Arrays.asList(entries));
+    int oldPosition = -1;
     for (int i = 0; i < list.size(); i++)
     {
     for (int i = 0; i < list.size(); i++)
     {
-      Entry entry;
+      Entry entry = list.get(i);
 
 
-      entry = list.get(i);
-      if (entry.getCount() <= newEntry.getCount())
+      if (entry.getCounter() < newEntry.getCounter())
       {
       {
+        if (oldPosition > -1)
+        {
+          if (list.get(oldPosition).getCounter() > newEntry.getCounter())
+          {
+            throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+          }
+          else
+          {
+            // Entry for word already exists with the same counting! Nothing changed...
+            return this;
+          }
+        }
+
         list.add(i, newEntry);
         for (int j = i+1; j < list.size(); j++)
         {
         list.add(i, newEntry);
         for (int j = i+1; j < list.size(); j++)
         {
@@ -41,13 +55,105 @@ public class Ranking
             break;
           }
         }
             break;
           }
         }
-        if (list.size() > 10)
+        if (list.size() > MAX_ENTRIES)
         {
         {
-          list = list.subList(0,10);
+          list = list.subList(0, MAX_ENTRIES);
         }
         entries = list.toArray(num -> new Entry[num]);
         }
         entries = list.toArray(num -> new Entry[num]);
-        return;
+        return this;
+      }
+
+      if (entry.getWord().equals(newEntry.getWord()))
+        oldPosition = i;
+    }
+
+    if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
+    {
+      throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+    }
+
+    if (list.size() < MAX_ENTRIES)
+    {
+      list.add(newEntry);
+      entries = list.toArray(num -> new Entry[num]);
+    }
+
+    return this;
+  }
+
+  public Ranking validate() throws IllegalArgumentException
+  {
+    if (this.entries.length > MAX_ENTRIES)
+      throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
+
+    Set<String> seenWords = new HashSet<>();
+    long lowesCounting = Long.MAX_VALUE;
+
+    for (int i=0; i<this.entries.length; i++)
+    {
+      Entry entry = this.entries[i];
+
+      if (seenWords.contains(entry.getWord()))
+        throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+      if (entry.getCounter() > lowesCounting)
+        throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
+
+      seenWords.add(entry.getWord());
+      lowesCounting = entry.getCounter();
+    }
+
+    return this;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o)
+      return true;
+    if (o == null)
+      return false;
+    if (!(o instanceof Ranking))
+      return false;
+
+    Ranking other = (Ranking)o;
+
+    if (other.entries.length != entries.length)
+      return false;
+
+    if (entries.length == 0)
+      return true;
+
+    int i = 0;
+    Set<String> myWordsWithCurrentCount = new HashSet<>();
+    Set<String> otherWordsWithCurrentCount = new HashSet<>();
+    Entry myEntry = entries[i];
+    long currentCount = myEntry.getCounter();
+    myWordsWithCurrentCount.add(myEntry.getWord());
+    while (true)
+    {
+      Entry otherEntry = other.entries[i];
+      if (otherEntry.getCounter() != currentCount)
+        return false;
+      otherWordsWithCurrentCount.add(otherEntry.getWord());
+      if (++i >= entries.length)
+        return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
+      myEntry = entries[i];
+      if (myEntry.getCounter() != currentCount)
+      {
+        if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
+          return false;
+        currentCount = myEntry.getCounter();
+        myWordsWithCurrentCount.clear();
+        otherWordsWithCurrentCount.clear();
       }
       }
+      myWordsWithCurrentCount.add(myEntry.getWord());
     }
   }
     }
   }
+
+  public static Ranking of(Entry... entries)
+  {
+    Ranking ranking = new Ranking(entries);
+    ranking.validate();
+    return ranking;
+  }
 }
 }
index 27dca95..5c14ae7 100644 (file)
@@ -2,11 +2,9 @@ package de.juplo.kafka.wordcount.top10;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
 
 
 @SpringBootApplication
 
 
 @SpringBootApplication
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
 public class Top10Application
 {
        public static void main(String[] args)
 public class Top10Application
 {
        public static void main(String[] args)
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
new file mode 100644 (file)
index 0000000..03497e4
--- /dev/null
@@ -0,0 +1,88 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@Slf4j
+public class Top10ApplicationConfiguration
+{
+       @Bean
+       public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+       {
+               Properties props = new Properties();
+
+               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
+               props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+               props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
+               props.put(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "word:" + Key.class.getName() + "," +
+                               "counter:" + Entry.class.getName() + "," +
+                               "user:" + User.class.getName() + "," +
+                               "ranking:" + Ranking.class.getName());
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+               if (properties.getCommitInterval() != null)
+                       props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
+               if (properties.getCacheMaxBytes() != null)
+                       props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes());
+               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return props;
+       }
+
+       @Bean(initMethod = "start", destroyMethod = "stop")
+       public Top10StreamProcessor streamProcessor(
+                       Top10ApplicationProperties applicationProperties,
+                       Properties streamProcessorProperties,
+                       KeyValueBytesStoreSupplier storeSupplier,
+                       ConfigurableApplicationContext context)
+       {
+               Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+                               applicationProperties.getInputTopic(),
+                               applicationProperties.getOutputTopic(),
+                               streamProcessorProperties,
+                               storeSupplier);
+
+               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+               return streamProcessor;
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier storeSupplier()
+       {
+               return Stores.persistentKeyValueStore("top10");
+       }
+}
index 93b78ec..d3bb236 100644 (file)
@@ -17,4 +17,6 @@ public class Top10ApplicationProperties
   private String applicationId = "top10";
   private String inputTopic = "countings";
   private String outputTopic = "top10";
   private String applicationId = "top10";
   private String inputTopic = "countings";
   private String outputTopic = "top10";
+  private Integer commitInterval;
+  private Integer cacheMaxBytes;
 }
 }
index 862913a..343ab4d 100644 (file)
 package de.juplo.kafka.wordcount.top10;
 
 package de.juplo.kafka.wordcount.top10;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.springframework.boot.SpringApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.stereotype.Component;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import java.util.Properties;
 import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
-
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
 @Slf4j
 
 
 @Slf4j
-@Component
 public class Top10StreamProcessor
 {
 public class Top10StreamProcessor
 {
-       final static Pattern PATTERN = Pattern.compile("\\W+");
-
        public final KafkaStreams streams;
 
 
        public Top10StreamProcessor(
        public final KafkaStreams streams;
 
 
        public Top10StreamProcessor(
-                       Top10ApplicationProperties properties,
-                       ObjectMapper mapper,
-                       ConfigurableApplicationContext context)
+                       String inputTopic,
+                       String outputTopic,
+                       Properties props,
+                       KeyValueBytesStoreSupplier storeSupplier)
+       {
+               Topology topology = Top10StreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic,
+                               storeSupplier);
+
+               streams = new KafkaStreams(topology, props);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<String, String>stream(properties.getInputTopic())
-                               .map((keyJson, countStr) ->
-                               {
-                                       try
-                                       {
-                                               Key key = mapper.readValue(keyJson, Key.class);
-                                               Long count = Long.parseLong(countStr);
-                                               Entry entry = Entry.of(key.getWord(), count);
-                                               String entryJson = mapper.writeValueAsString(entry);
-                                               return new KeyValue<>(key.getUsername(), entryJson);
-                                       }
-                                       catch (JsonProcessingException e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               })
+                               .<Key, Entry>stream(inputTopic)
+                               .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
                                .groupByKey()
                                .aggregate(
                                .groupByKey()
                                .aggregate(
-                                               () -> "{\"entries\"     : []}",
-                                               (username, entryJson, rankingJson) ->
-                                               {
-                                                       try
-                                                       {
-                                                               Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
-                                                               ranking.add(mapper.readValue(entryJson, Entry.class));
-                                                               return mapper.writeValueAsString(ranking);
-                                                       }
-                                                       catch (JsonProcessingException e)
-                                                       {
-                                                               throw new RuntimeException(e);
-                                                       }
-                                               }
-                               )
+                                               () -> new Ranking(),
+                                               (user, entry, ranking) -> ranking.add(entry),
+                                               Materialized.as(storeSupplier))
                                .toStream()
                                .toStream()
-                               .to(properties.getOutputTopic());
+                               .to(outputTopic);
+
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
 
 
-               Properties props = new Properties();
-               props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
-               props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-               props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-               props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+               return topology;
+       }
 
 
-               streams = new KafkaStreams(builder.build(), props);
-               streams.setUncaughtExceptionHandler((Throwable e) ->
-               {
-                       log.error("Unexpected error!", e);
-                       CompletableFuture.runAsync(() ->
-                       {
-                               log.info("Stopping application...");
-                               SpringApplication.exit(context, () -> 1);
-                       });
-                       return SHUTDOWN_CLIENT;
-               });
+       ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+       {
+               return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
        }
 
        }
 
-       @PostConstruct
        public void start()
        {
                log.info("Starting Stream-Processor");
                streams.start();
        }
 
        public void start()
        {
                log.info("Starting Stream-Processor");
                streams.start();
        }
 
-       @PreDestroy
        public void stop()
        {
                log.info("Stopping Stream-Processor");
        public void stop()
        {
                log.info("Stopping Stream-Processor");
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java
new file mode 100644 (file)
index 0000000..53c258d
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+  String user;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java
new file mode 100644 (file)
index 0000000..d98ae64
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestCounter
+{
+  String user;
+  String word;
+  long counter;
+
+  public static TestCounter of(TestWord word, long counter)
+  {
+    return new TestCounter(word.getUser(), word.getWord(), counter);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java
new file mode 100644 (file)
index 0000000..8008e12
--- /dev/null
@@ -0,0 +1,17 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TestWord
+{
+  private String user;
+  private String word;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java b/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java
new file mode 100644 (file)
index 0000000..1bbd3ba
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.Entry;
+import lombok.Data;
+
+
+@Data
+public class RankingData
+{
+  private Entry[] entries;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java
new file mode 100644 (file)
index 0000000..26749e9
--- /dev/null
@@ -0,0 +1,276 @@
+package de.juplo.kafka.wordcount.top10;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+
+
+public class RankingTest
+{
+  @DisplayName("A newly created instance is empty")
+  @Test
+  public void testNewRankingIsEmpty()
+  {
+    Ranking ranking = new Ranking();
+    assertThat(ranking.getEntries()).isEmpty();
+  }
+
+  @DisplayName("An instance that was build from an empty ranking is empty")
+  @Test
+  public void testRankingOfYieldsExpectedResultForEmptyList()
+  {
+    Ranking ranking = new Ranking();
+    assertThat(ranking.getEntries()).isEmpty();
+  }
+
+  @DisplayName("An instance that was build from a valid ranking contains the expected entries")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testRankingOfYieldsExpectedResultsForValidRankings(List<Entry> entryList)
+  {
+    Ranking ranking = Ranking.of(toArray(entryList));
+    assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList);
+  }
+
+  @DisplayName("The builder fails for invalid rankings")
+  @ParameterizedTest
+  @MethodSource("invalidRankingsProvider")
+  public void testRankingOfThrowsExceptionForInvalidRankings(List<Entry> entryList)
+  {
+    assertThatExceptionOfType(IllegalArgumentException.class)
+        .isThrownBy(() -> Ranking.of(toArray(entryList)));
+  }
+
+  @DisplayName("Adding a new word with highest ranking, pushes all other words down")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testAddingNewWordWithHighestRanking(List<Entry> entryList)
+  {
+    Ranking ranking = Ranking.of(toArray(entryList));
+    Entry newEntry = Entry.of("NEW!", rankingForPosition(-1));
+    ranking.add(newEntry);
+    assertThat(ranking.getEntries()[0]).isEqualTo(newEntry);
+    for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+    {
+      assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+    }
+  }
+
+  @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testAddingNewWordWithExistingRanking(List<Entry> entryList)
+  {
+    for (int position = 0; position < entryList.size(); position++ )
+    {
+      Ranking ranking = Ranking.of(toArray(entryList));
+      Entry newEntry = Entry.of("NEW!", rankingForPosition(position));
+      ranking.add(newEntry);
+      for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+      {
+        if (i < position)
+        {
+          assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+        }
+        if (i == position)
+        {
+          assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+          assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry);
+        }
+        if (i > position)
+        {
+          assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+        }
+      }
+    }
+  }
+
+  @DisplayName("Adding a highest ranking for an existing word shifts it to the first place")
+  @ParameterizedTest
+  @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+  public void testAddingExistingWordWithHighestRanking(int position)
+  {
+    Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+    String word = wordForPosition(position);
+    Entry highestEntry = Entry.of(word, 100l);
+    ranking.add(highestEntry);
+    List<Entry> expectedEntries = Stream
+        .concat(
+            Stream.of(highestEntry),
+            VALID_RANKINGS[0]
+                .stream()
+                .filter(entry -> !entry.getWord().equals(word)))
+        .toList();
+    assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
+  }
+
+  @DisplayName("Adding an existing word with unchanged ranking changes nothing")
+  @ParameterizedTest
+  @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+  public void testAddingExistingWordWithUnchangedRanking(int position)
+  {
+    Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+    Entry unchangedEntry = Entry.of(
+        wordForPosition(position),
+        rankingForPosition(position));
+    ranking.add(unchangedEntry);
+    assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]);
+  }
+
+  @DisplayName("Adding an existing word with a lower ranking fails")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testAddingExistingWordWithLowerRankingFails(List<Entry> entryList)
+  {
+    Ranking ranking = Ranking.of(toArray(entryList));
+    entryList.forEach(entry ->
+      assertThatExceptionOfType(IllegalArgumentException.class)
+          .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+  }
+
+  @DisplayName("Identical rankings are considered equal")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testIdenticalRankingsAreConsideredEaqual(List<Entry> entryList)
+  {
+    assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList)));
+  }
+
+  @DisplayName("Two empty rankings are considered equal")
+  @Test
+  public void testTwoEmptyRankingsAreConsideredEaqual()
+  {
+    assertThat(Ranking.of()).isEqualTo(Ranking.of());
+  }
+
+  @DisplayName("A changed ranking is not considered equal to its unchanged counter-part")
+  @ParameterizedTest
+  @MethodSource("validRankingsProvider")
+  public void testChangedRankingsDoNotEaqualUnchangedOne(List<Entry> entryList)
+  {
+    Ranking changed = Ranking.of(toArray(entryList));
+    changed.add(Entry.of("devilish", 666l));
+    assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList)));
+  }
+
+  @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ")
+  @Test
+  public void testRankingWithDifferentOrderForSameCountAreEqual()
+  {
+    assertThat(
+        Ranking.of(
+            Entry.of("a1",10l),
+            Entry.of("a2",10l),
+            Entry.of("b1", 9l),
+            Entry.of("b2",9l),
+            Entry.of("c1", 8l),
+            Entry.of("c2", 8l)))
+        .isEqualTo(Ranking.of(
+            Entry.of("a2",10l),
+            Entry.of("a1",10l),
+            Entry.of("b2", 9l),
+            Entry.of("b1",9l),
+            Entry.of("c2", 8l),
+            Entry.of("c1", 8l)));
+  }
+
+
+  Entry[] toArray(List<Entry> entryList)
+  {
+    return entryList.toArray(size -> new Entry[size]);
+  }
+
+  static String wordForPosition(int position)
+  {
+    return Integer.toString(position+1);
+  }
+
+  static long rankingForPosition(int position)
+  {
+    return (long)Ranking.MAX_ENTRIES * 2 - position;
+  }
+
+  static Stream<List<Entry>> validRankingsProvider()
+  {
+    return Stream.of(VALID_RANKINGS);
+  }
+
+  static Stream<List<Entry>> invalidRankingsProvider()
+  {
+    return Stream.of(INVALID_RANKINGS);
+  }
+
+  static String[] WORDS = new String[Ranking.MAX_ENTRIES];
+  static List<Entry>[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES];
+
+  static
+  {
+    for (int i = 0; i < Ranking.MAX_ENTRIES; i++)
+    {
+      List<Entry> ranking = new LinkedList<>();
+      String word = null;
+      for (int position = 0; position <= i; position++)
+      {
+        word = wordForPosition(position);
+        Entry entry = Entry.of(word, rankingForPosition(position));
+        ranking.add(entry);
+      }
+      WORDS[i] = word;
+      VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking;
+    }
+  }
+
+  static List<Entry>[] INVALID_RANKINGS = new List[] {
+      List.of(
+          Entry.of("Platz eins", 1l),
+          Entry.of("Platz zwei", 2l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz eins", 1l)),
+      List.of(
+          Entry.of("Platz eins", 11l),
+          Entry.of("Platz eins", 1l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz eins", 11111111l),
+          Entry.of("Platz zwei", 2222222l),
+          Entry.of("Platz fünf", 555555l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz drei", 33333333l),
+          Entry.of("Platz vier", 4444444l),
+          Entry.of("Platz eins", 111111l),
+          Entry.of("Platz sechs", 66666l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz drei", 33333333l),
+          Entry.of("Platz vier", 4444444l),
+          Entry.of("Platz fünf", 555555l),
+          Entry.of("Platz sechs", 66666l),
+          Entry.of("Platz eins", 1l)),
+      List.of(
+          Entry.of("Platz eins", 1111111111l),
+          Entry.of("Platz zwei", 222222222l),
+          Entry.of("Platz drei", 33333333l),
+          Entry.of("Platz vier", 4444444l),
+          Entry.of("Platz fünf", 555555l),
+          Entry.of("Platz sechs", 66666l),
+          Entry.of("Platz sieben", 7777l),
+          Entry.of("Platz acht", 888l),
+          Entry.of("Platz neun", 99l),
+          Entry.of("Platz 10", 6l),
+          Entry.of("Platz 11", 3l))};
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
new file mode 100644 (file)
index 0000000..f6d7ccd
--- /dev/null
@@ -0,0 +1,166 @@
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+       static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+       {
+                       new KeyValue<>(
+                                       TestWord.of("peter","Hallo"),
+                                       TestCounter.of("peter","Hallo",1)),
+                       new KeyValue<>(
+                                       TestWord.of("klaus","Müsch"),
+                                       TestCounter.of("klaus","Müsch",1)),
+                       new KeyValue<>(
+                                       TestWord.of("peter","Welt"),
+                                       TestCounter.of("peter","Welt",1)),
+                       new KeyValue<>(
+                                       TestWord.of("klaus","Müsch"),
+                                       TestCounter.of("klaus","Müsch",2)),
+                       new KeyValue<>(
+                                       TestWord.of("klaus","s"),
+                                       TestCounter.of("klaus","s",1)),
+                       new KeyValue<>(
+                                       TestWord.of("peter","Boäh"),
+                                       TestCounter.of("peter","Boäh",1)),
+                       new KeyValue<>(
+                                       TestWord.of("peter","Welt"),
+                                       TestCounter.of("peter","Welt",2)),
+                       new KeyValue<>(
+                                       TestWord.of("peter","Boäh"),
+                                       TestCounter.of("peter","Boäh",2)),
+                       new KeyValue<>(
+                                       TestWord.of("klaus","s"),
+                                       TestCounter.of("klaus","s",2)),
+                       new KeyValue<>(
+                                       TestWord.of("peter","Boäh"),
+                                       TestCounter.of("peter","Boäh",3)),
+                       new KeyValue<>(
+                                       TestWord.of("klaus","s"),
+                                       TestCounter.of("klaus","s",3)),
+       };
+
+       static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
+       {
+               expectedMessages().forEach(
+                               (user, rankings) ->
+                                               assertThat(receivedMessages.get(user))
+                                                               .containsExactlyElementsOf(rankings));
+       }
+
+       static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+       {
+               assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
+               assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+       }
+
+       static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair( // 0
+                                       User.of("peter"),
+                                       Ranking.of(
+                                                       Entry.of("Hallo", 1l))),
+                       KeyValue.pair( // 1
+                                       User.of("klaus"),
+                                       Ranking.of(
+                                                       Entry.of("Müsch", 1l))),
+                       KeyValue.pair( // 2
+                                       User.of("peter"),
+                                       Ranking.of(
+                                                       Entry.of("Hallo", 1l),
+                                                       Entry.of("Welt", 1l))),
+                       KeyValue.pair( // 3
+                                       User.of("klaus"),
+                                       Ranking.of(
+                                                       Entry.of("Müsch", 2l))),
+                       KeyValue.pair( // 4
+                                       User.of("klaus"),
+                                       Ranking.of(
+                                                       Entry.of("Müsch", 2l),
+                                                       Entry.of("s", 1l))),
+                       KeyValue.pair( // 5
+                                       User.of("peter"),
+                                       Ranking.of(
+                                                       Entry.of("Hallo", 1l),
+                                                       Entry.of("Welt", 1l),
+                                                       Entry.of("Boäh", 1l))),
+                       KeyValue.pair( // 6
+                                       User.of("peter"),
+                                       Ranking.of(
+                                                       Entry.of("Welt", 2l),
+                                                       Entry.of("Hallo", 1l),
+                                                       Entry.of("Boäh", 1l))),
+                       KeyValue.pair( // 7
+                                       User.of("peter"),
+                                       Ranking.of(
+                                                       Entry.of("Welt", 2l),
+                                                       Entry.of("Boäh", 2l),
+                                                       Entry.of("Hallo", 1l))),
+                       KeyValue.pair( // 8
+                                       User.of("klaus"),
+                                       Ranking.of(
+                                                       Entry.of("Müsch", 2l),
+                                                       Entry.of("s", 2l))),
+                       KeyValue.pair( // 9
+                                       User.of("peter"),
+                                       Ranking.of(
+                                                       Entry.of("Boäh", 3l),
+                                                       Entry.of("Welt", 2l),
+                                                       Entry.of("Hallo", 1l))),
+                       KeyValue.pair( // 10
+                                       User.of("klaus"),
+                                       Ranking.of(
+                                                       Entry.of("s", 3l),
+                                                       Entry.of("Müsch", 2l))),
+       };
+
+       static MultiValueMap<User, Ranking> expectedMessages()
+       {
+               MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+               Stream
+                               .of(EXPECTED_MESSAGES)
+                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+               return expectedMessages;
+       }
+
+       static Map<String, Object> convertToMap(Properties properties)
+       {
+               return properties
+                               .entrySet()
+                               .stream()
+                               .collect(
+                                               Collectors.toMap(
+                                                               entry -> (String)entry.getKey(),
+                                                               entry -> entry.getValue()
+                                               ));
+       }
+
+       static String parseHeader(Headers headers, String key)
+       {
+               Header header = headers.lastHeader(key);
+               if (header == null)
+               {
+                       return key + "=null";
+               }
+               else
+               {
+                       return key + "=" + new String(header.value());
+               }
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java
new file mode 100644 (file)
index 0000000..6707acc
--- /dev/null
@@ -0,0 +1,145 @@
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.query.RankingData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+                               "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
+                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.RankingData",
+                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10   ",
+                               "logging.level.root=WARN",
+                               "logging.level.de.juplo=DEBUG",
+                               "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.top10.commit-interval=0",
+                               "juplo.wordcount.top10.cacheMaxBytes=0",
+                               "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
+                               "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+@Slf4j
+public class Top10ApplicationIT
+{
+       public final static String TOPIC_IN = "in";
+       public final static String TOPIC_OUT = "out";
+       public final static String STORE_NAME = "TEST-STORE";
+
+
+       @Autowired
+       Consumer consumer;
+       @Autowired
+       Top10StreamProcessor top10StreamProcessor;
+
+
+       @BeforeAll
+       public static void testSendMessage(
+                       @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+       {
+               Stream
+                               .of(TestData.INPUT_MESSAGES)
+                               .forEach(kv ->
+                               {
+                                       try
+                                       {
+                                               SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               log.info(
+                                                               "Sent: {}={}, partition={}, offset={}",
+                                                               result.getProducerRecord().key(),
+                                                               result.getProducerRecord().value(),
+                                                               result.getRecordMetadata().partition(),
+                                                               result.getRecordMetadata().offset());
+                                       }
+                                       catch (Exception e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               });
+       }
+
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expexted state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(top10StreamProcessor.getStore(STORE_NAME)));
+       }
+
+       @DisplayName("Await the expected output messages")
+       @Test
+       @Disabled
+       public void testAwaitExpectedMessages()
+       {
+               await("Expexted messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+       }
+
+
+       static class Consumer
+       {
+               private final MultiValueMap<User, Ranking> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public synchronized void receive(
+                               @Header(KafkaHeaders.RECEIVED_KEY) User user,
+                               @Payload RankingData ranking)
+               {
+                       log.debug("Received message: {} -> {}", user, ranking);
+                       received.add(user, Ranking.of(ranking.getEntries()));
+               }
+
+               synchronized MultiValueMap<User, Ranking> getReceivedMessages()
+               {
+                       return received;
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+               }
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..3feeea3
--- /dev/null
@@ -0,0 +1,110 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+  public final static String IN = "TEST-IN";
+  public final static String OUT = "TEST-OUT";
+  public final static String STORE_NAME = "TOPOLOGY-TEST";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<Key, Entry> in;
+  TestOutputTopic<User, Ranking> out;
+
+
+  @BeforeEach
+  public void setUp()
+  {
+    Topology topology = Top10StreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore(STORE_NAME));
+
+    Top10ApplicationConfiguration applicationConfiguriation =
+        new Top10ApplicationConfiguration();
+    Properties streamProcessorProperties =
+        applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
+    Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+    JsonSerde<?> keySerde = new JsonSerde<>();
+    keySerde.configure(propertyMap, true);
+    JsonSerde<?> valueSerde = new JsonSerde<>();
+    valueSerde.configure(propertyMap, false);
+
+    testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+
+    in = testDriver.createInputTopic(
+        IN,
+        (JsonSerializer<Key>)keySerde.serializer(),
+        (JsonSerializer<Entry>)valueSerde.serializer());
+
+    out = testDriver.createOutputTopic(
+        OUT,
+        (JsonDeserializer<User>)keySerde.deserializer(),
+        (JsonDeserializer<Ranking>)valueSerde.deserializer());
+
+  }
+
+
+  @Test
+  public void test()
+  {
+    Stream
+        .of(TestData.INPUT_MESSAGES)
+        .forEach(kv -> in.pipeInput(
+            Key.of(kv.key.getUser(), kv.key.getWord()),
+            Entry.of(kv.value.getWord(), kv.value.getCounter())));
+
+    MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+    out
+        .readRecordsToList()
+        .forEach(record ->
+        {
+          log.debug(
+              "OUT: {} -> {}, {}, {}",
+              record.key(),
+              record.value(),
+              parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+              parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+          receivedMessages.add(record.key(), record.value());
+        });
+
+    TestData.assertExpectedMessages(receivedMessages);
+
+    KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+}