From: Kai Moritz <kai@juplo.de>
Date: Sat, 15 Jun 2024 21:19:02 +0000 (+0200)
Subject: popular: 1.0.0 - Renamed packages and classes -- MOVE
X-Git-Tag: popular-on-counter~2
X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=220bc2a369a7edbafd2473194b2ce4ed2b6a0b69;p=demos%2Fkafka%2Fwordcount

popular: 1.0.0 - Renamed packages and classes -- MOVE
---

diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
deleted file mode 100644
index e6d3b1f..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class CounterApplication
-{
-	public static void main(String[] args)
-	{
-		SpringApplication.run(CounterApplication.class, args);
-	}
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
deleted file mode 100644
index 484b8de..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
-@Slf4j
-public class CounterApplicationConfiguriation
-{
-	@Bean
-	public Properties streamProcessorProperties(
-			CounterApplicationProperties counterProperties)
-	{
-		Properties propertyMap = serializationConfig();
-
-		propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
-
-		propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
-		if (counterProperties.getCommitInterval() != null)
-			propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
-		if (counterProperties.getCacheMaxBytes() != null)
-			propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
-
-		propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-		return propertyMap;
-	}
-
-	static Properties serializationConfig()
-	{
-		Properties propertyMap = new Properties();
-
-		propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-		propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-		propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
-		propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
-		propertyMap.put(
-				JsonDeserializer.TYPE_MAPPINGS,
-				"user:" + User.class.getName() + "," +
-				"word:" + Word.class.getName() + "," +
-				"counter:" + WordCounter.class.getName());
-
-		return propertyMap;
-	}
-
-	@Bean(initMethod = "start", destroyMethod = "stop")
-	public CounterStreamProcessor streamProcessor(
-			CounterApplicationProperties applicationProperties,
-			Properties streamProcessorProperties,
-			KeyValueBytesStoreSupplier storeSupplier,
-			ConfigurableApplicationContext context)
-	{
-		CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
-				applicationProperties.getInputTopic(),
-				applicationProperties.getOutputTopic(),
-				streamProcessorProperties,
-				storeSupplier);
-
-		streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
-		{
-			log.error("Unexpected error!", e);
-			CompletableFuture.runAsync(() ->
-			{
-				log.info("Stopping application...");
-				SpringApplication.exit(context, () -> 1);
-			});
-			return SHUTDOWN_CLIENT;
-		});
-
-
-		return streamProcessor;
-	}
-
-	@Bean
-	public KeyValueBytesStoreSupplier storeSupplier()
-	{
-		return Stores.persistentKeyValueStore(STORE_NAME);
-	}
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
deleted file mode 100644
index c3ada17..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.counter")
-@Getter
-@Setter
-@ToString
-public class CounterApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String applicationId = "counter";
-  private String inputTopic = "words";
-  private String outputTopic = "countings";
-  private Integer commitInterval;
-  private Integer cacheMaxBytes;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
deleted file mode 100644
index 64bd619..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-
-
-@Slf4j
-public class CounterStreamProcessor
-{
-	public static final String STORE_NAME = "counter";
-
-
-	public final KafkaStreams streams;
-
-
-	public CounterStreamProcessor(
-			String inputTopic,
-			String outputTopic,
-			Properties properties,
-			KeyValueBytesStoreSupplier storeSupplier)
-	{
-		Topology topology = CounterStreamProcessor.buildTopology(
-				inputTopic,
-				outputTopic,
-				storeSupplier);
-
-		streams = new KafkaStreams(topology, properties);
-	}
-
-	static Topology buildTopology(
-			String inputTopic,
-			String outputTopic,
-			KeyValueBytesStoreSupplier storeSupplier)
-	{
-		StreamsBuilder builder = new StreamsBuilder();
-
-		KStream<User, Word> source = builder.stream(inputTopic);
-
-		source
-				.map((key, word) -> new KeyValue<>(word, word))
-				.groupByKey()
-				.count(
-						Materialized
-								.<Word, Long>as(storeSupplier)
-								.withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
-				.toStream()
-				.map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
-				.to(outputTopic);
-
-		Topology topology = builder.build();
-		log.info("\n\n{}", topology.describe());
-
-		return topology;
-	}
-
-	ReadOnlyKeyValueStore<Word, Long> getStore()
-	{
-		return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
-	}
-
-	public void start()
-	{
-		log.info("Starting Stream-Processor");
-		streams.start();
-	}
-
-	public void stop()
-	{
-		log.info("Stopping Stream-Processor");
-		streams.close();
-	}
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java
deleted file mode 100644
index e38bcba..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/User.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class User
-{
-  String user;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
deleted file mode 100644
index 77287d5..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Word
-{
-  private String user;
-  private String word;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
deleted file mode 100644
index f1fce71..0000000
--- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class WordCounter
-{
-  String user;
-  String word;
-  long counter;
-
-  public static WordCounter of(Word word, long counter)
-  {
-    return new WordCounter(word.getUser(), word.getWord(), counter);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java
new file mode 100644
index 0000000..e6d3b1f
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.counter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class CounterApplication
+{
+	public static void main(String[] args)
+	{
+		SpringApplication.run(CounterApplication.class, args);
+	}
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java
new file mode 100644
index 0000000..484b8de
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java
@@ -0,0 +1,97 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(CounterApplicationProperties.class)
+@Slf4j
+public class CounterApplicationConfiguriation
+{
+	@Bean
+	public Properties streamProcessorProperties(
+			CounterApplicationProperties counterProperties)
+	{
+		Properties propertyMap = serializationConfig();
+
+		propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
+
+		propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
+		if (counterProperties.getCommitInterval() != null)
+			propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+		if (counterProperties.getCacheMaxBytes() != null)
+			propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
+
+		propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+		return propertyMap;
+	}
+
+	static Properties serializationConfig()
+	{
+		Properties propertyMap = new Properties();
+
+		propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+		propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+		propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+		propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
+		propertyMap.put(
+				JsonDeserializer.TYPE_MAPPINGS,
+				"user:" + User.class.getName() + "," +
+				"word:" + Word.class.getName() + "," +
+				"counter:" + WordCounter.class.getName());
+
+		return propertyMap;
+	}
+
+	@Bean(initMethod = "start", destroyMethod = "stop")
+	public CounterStreamProcessor streamProcessor(
+			CounterApplicationProperties applicationProperties,
+			Properties streamProcessorProperties,
+			KeyValueBytesStoreSupplier storeSupplier,
+			ConfigurableApplicationContext context)
+	{
+		CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+				applicationProperties.getInputTopic(),
+				applicationProperties.getOutputTopic(),
+				streamProcessorProperties,
+				storeSupplier);
+
+		streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+		{
+			log.error("Unexpected error!", e);
+			CompletableFuture.runAsync(() ->
+			{
+				log.info("Stopping application...");
+				SpringApplication.exit(context, () -> 1);
+			});
+			return SHUTDOWN_CLIENT;
+		});
+
+
+		return streamProcessor;
+	}
+
+	@Bean
+	public KeyValueBytesStoreSupplier storeSupplier()
+	{
+		return Stores.persistentKeyValueStore(STORE_NAME);
+	}
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java
new file mode 100644
index 0000000..c3ada17
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.counter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.counter")
+@Getter
+@Setter
+@ToString
+public class CounterApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String applicationId = "counter";
+  private String inputTopic = "words";
+  private String outputTopic = "countings";
+  private Integer commitInterval;
+  private Integer cacheMaxBytes;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
new file mode 100644
index 0000000..64bd619
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
@@ -0,0 +1,80 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+
+
+@Slf4j
+public class CounterStreamProcessor
+{
+	public static final String STORE_NAME = "counter";
+
+
+	public final KafkaStreams streams;
+
+
+	public CounterStreamProcessor(
+			String inputTopic,
+			String outputTopic,
+			Properties properties,
+			KeyValueBytesStoreSupplier storeSupplier)
+	{
+		Topology topology = CounterStreamProcessor.buildTopology(
+				inputTopic,
+				outputTopic,
+				storeSupplier);
+
+		streams = new KafkaStreams(topology, properties);
+	}
+
+	static Topology buildTopology(
+			String inputTopic,
+			String outputTopic,
+			KeyValueBytesStoreSupplier storeSupplier)
+	{
+		StreamsBuilder builder = new StreamsBuilder();
+
+		KStream<User, Word> source = builder.stream(inputTopic);
+
+		source
+				.map((key, word) -> new KeyValue<>(word, word))
+				.groupByKey()
+				.count(
+						Materialized
+								.<Word, Long>as(storeSupplier)
+								.withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
+				.toStream()
+				.map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
+				.to(outputTopic);
+
+		Topology topology = builder.build();
+		log.info("\n\n{}", topology.describe());
+
+		return topology;
+	}
+
+	ReadOnlyKeyValueStore<Word, Long> getStore()
+	{
+		return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+	}
+
+	public void start()
+	{
+		log.info("Starting Stream-Processor");
+		streams.start();
+	}
+
+	public void stop()
+	{
+		log.info("Stopping Stream-Processor");
+		streams.close();
+	}
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/User.java b/src/main/java/de/juplo/kafka/wordcount/popular/User.java
new file mode 100644
index 0000000..e38bcba
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/User.java
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+  String user;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java
new file mode 100644
index 0000000..77287d5
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java
new file mode 100644
index 0000000..f1fce71
--- /dev/null
+++ b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class WordCounter
+{
+  String user;
+  String word;
+  long counter;
+
+  public static WordCounter of(Word word, long counter)
+  {
+    return new WordCounter(word.getUser(), word.getWord(), counter);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
deleted file mode 100644
index 334cd05..0000000
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
+++ /dev/null
@@ -1,165 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
-		properties = {
-				"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-				"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-				"spring.kafka.producer.properties.spring.json.add.type.headers=false",
-				"spring.kafka.consumer.auto-offset-reset=earliest",
-				"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-				"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-				"spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
-				"logging.level.root=WARN",
-				"logging.level.de.juplo=DEBUG",
-				"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
-				"juplo.wordcount.counter.commit-interval=0",
-				"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
-				"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
-@Slf4j
-public class CounterApplicationIT
-{
-	public static final String TOPIC_IN = "in";
-	public static final String TOPIC_OUT = "out";
-
-	@Autowired
-	Consumer consumer;
-	@Autowired
-	CounterStreamProcessor streamProcessor;
-
-
-	@BeforeAll
-	public static void testSendMessage(
-			@Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
-	{
-		TestData
-				.getInputMessages()
-				.forEach(kv ->
-				{
-					try
-					{
-						SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
-						log.info(
-								"Sent: {}={}, partition={}, offset={}",
-								result.getProducerRecord().key(),
-								result.getProducerRecord().value(),
-								result.getRecordMetadata().partition(),
-								result.getRecordMetadata().offset());
-					}
-					catch (Exception e)
-					{
-						throw new RuntimeException(e);
-					}
-				});
-	}
-
-	@DisplayName("Await the expected number of messages")
-	@Test
-	public void testAwaitExpectedNumberOfMessagesForUsers()
-	{
-		await("Expected number of messages")
-				.atMost(Duration.ofSeconds(5))
-				.untilAsserted(() -> consumer.enforceAssertion(
-						receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
-	}
-
-	@DisplayName("Await the expected output messages")
-	@Test
-	void testSendMessage()
-	{
-		await("Expected messages")
-				.atMost(Duration.ofSeconds(10))
-				.untilAsserted(() -> consumer.enforceAssertion(
-						receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
-	}
-
-	@DisplayName("Await the expected final output messages")
-	@Test
-	public void testAwaitExpectedLastMessagesForUsers()
-	{
-		await("Expected final output messages")
-				.atMost(Duration.ofSeconds(5))
-				.untilAsserted(() -> consumer.enforceAssertion(
-						receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
-	}
-
-	@DisplayName("Await the expected state in the state-store")
-	@Test
-	public void testAwaitExpectedState()
-	{
-		await("Expected state")
-				.atMost(Duration.ofSeconds(5))
-				.untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
-	}
-
-
-	static class Consumer
-	{
-		private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
-
-		@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-		public synchronized void receive(
-				@Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
-				@Payload TestOutputWordCounter counter)
-		{
-			log.debug("Received message: {} -> {}", word, counter);
-			received.add(word, counter);
-		}
-
-		synchronized void enforceAssertion(
-				java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
-		{
-			assertion.accept(received);
-		}
-	}
-
-	@TestConfiguration
-	static class Configuration
-	{
-		@Bean
-		Consumer consumer()
-		{
-			return new Consumer();
-		}
-
-		@Primary
-		@Bean
-		KeyValueBytesStoreSupplier inMemoryStoreSupplier()
-		{
-			return Stores.inMemoryKeyValueStore(STORE_NAME);
-		}
-	}
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
deleted file mode 100644
index 0ffd516..0000000
--- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-
-
-@Slf4j
-public class CounterStreamProcessorTopologyTest
-{
-  public static final String IN = "TEST-IN";
-  public static final String OUT = "TEST-OUT";
-  public static final String STORE_NAME = "TOPOLOGY-TEST";
-
-
-  TopologyTestDriver testDriver;
-  TestInputTopic<TestInputUser, TestInputWord> in;
-  TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
-
-
-  @BeforeEach
-  public void setUpTestDriver()
-  {
-    Topology topology = CounterStreamProcessor.buildTopology(
-        IN,
-        OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
-
-    testDriver = new TopologyTestDriver(topology, serializationConfig());
-
-    in = testDriver.createInputTopic(
-        IN,
-        new JsonSerializer().noTypeInfo(),
-        new JsonSerializer().noTypeInfo());
-
-    out = testDriver.createOutputTopic(
-        OUT,
-        new JsonDeserializer()
-            .copyWithType(TestOutputWord.class)
-            .ignoreTypeHeaders(),
-        new JsonDeserializer()
-            .copyWithType(TestOutputWordCounter.class)
-            .ignoreTypeHeaders());
-  }
-
-
-  @Test
-  public void test()
-  {
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
-    MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
-    out
-        .readRecordsToList()
-        .forEach(record -> receivedMessages.add(record.key(), record.value()));
-
-    TestData.assertExpectedMessages(receivedMessages);
-
-    TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
-    TestData.assertExpectedLastMessagesForWord(receivedMessages);
-
-    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
-    TestData.assertExpectedState(store);
-  }
-
-  @AfterEach
-  public void tearDown()
-  {
-    testDriver.close();
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
deleted file mode 100644
index 1ecfdbd..0000000
--- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
+++ /dev/null
@@ -1,206 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
-	static final String PETER = "peter";
-	static final String KLAUS = "klaus";
-
-	static final String WORD_HALLO = "Hallo";
-	static final String WORD_MÜSCH = "Müsch";
-	static final String WORD_WELT = "Welt";
-	static final String WORD_S = "s";
-	static final String WORD_BOÄH = "Boäh";
-
-	static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
-	static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
-	static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
-	static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
-	static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
-
-	private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
-	{
-			new KeyValue<>(
-					TestInputUser.of(PETER),
-					TestInputWord.of(PETER, WORD_HALLO)),
-			new KeyValue<>(
-					TestInputUser.of(KLAUS),
-					TestInputWord.of(KLAUS, WORD_MÜSCH)),
-			new KeyValue<>(
-					TestInputUser.of(PETER),
-					TestInputWord.of(PETER, WORD_WELT)),
-			new KeyValue<>(
-					TestInputUser.of(KLAUS),
-					TestInputWord.of(KLAUS, WORD_MÜSCH)),
-			new KeyValue<>(
-					TestInputUser.of(KLAUS),
-					TestInputWord.of(KLAUS, WORD_S)),
-			new KeyValue<>(
-					TestInputUser.of(PETER),
-					TestInputWord.of(PETER, WORD_BOÄH)),
-			new KeyValue<>(
-					TestInputUser.of(PETER),
-					TestInputWord.of(PETER, WORD_WELT)),
-			new KeyValue<>(
-					TestInputUser.of(PETER),
-					TestInputWord.of(PETER, WORD_BOÄH)),
-			new KeyValue<>(
-					TestInputUser.of(KLAUS),
-					TestInputWord.of(KLAUS, WORD_S)),
-			new KeyValue<>(
-					TestInputUser.of(PETER),
-					TestInputWord.of(PETER, WORD_BOÄH)),
-			new KeyValue<>(
-					TestInputUser.of(KLAUS),
-					TestInputWord.of(KLAUS, WORD_S)),
-	};
-
-	static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
-	{
-		return Stream.of(TestData.INPUT_MESSAGES);
-	}
-
-	static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-	{
-		expectedMessages().forEach(
-				(word, counter) ->
-						assertThat(receivedMessages.get(word))
-								.containsExactlyElementsOf(counter));
-	}
-
-	static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-	{
-		assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
-		assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
-		assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
-		assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
-		assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
-	}
-
-	private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
-	{
-		return messagesForUsers.get(word) == null
-				? 0
-				: messagesForUsers.get(word).size();
-	}
-
-	static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
-	{
-		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
-		assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
-		assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
-		assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
-		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
-	}
-
-	private static Word wordOf(TestOutputWord testOutputWord)
-	{
-		Word word = new Word();
-
-		word.setUser(testOutputWord.getUser());
-		word.setWord(testOutputWord.getWord());
-
-		return word;
-	}
-
-	static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-	{
-		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
-		assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
-		assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
-		assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
-		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
-	}
-
-	private static void assertWordCountEqualsWordCountFromLastMessage(
-			TestOutputWord word,
-			Long counter)
-	{
-		TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
-				word.getUser(),
-				word.getWord(),
-				counter);
-		assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
-	}
-
-	private static void assertWordCountEqualsWordCountFromLastMessage(
-			TestOutputWord word,
-			TestOutputWordCounter counter)
-	{
-		assertThat(counter).isEqualTo(getLastMessageFor(word));
-	}
-
-	private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
-	{
-		return getLastMessageFor(word, expectedMessages());
-	}
-
-	private static TestOutputWordCounter getLastMessageFor(
-			TestOutputWord user,
-			MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
-	{
-		return messagesForWord
-				.get(user)
-				.stream()
-				.reduce(null, (left, right) -> right);
-	}
-
-	private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
-	{
-			KeyValue.pair(
-					PETER_HALLO,
-					TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
-			KeyValue.pair(
-					KLAUS_MÜSCH,
-					TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
-			KeyValue.pair(
-					PETER_WELT,
-					TestOutputWordCounter.of(PETER, WORD_WELT,1)),
-			KeyValue.pair(
-					KLAUS_MÜSCH,
-					TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
-			KeyValue.pair(
-					KLAUS_S,
-					TestOutputWordCounter.of(KLAUS, WORD_S,1)),
-			KeyValue.pair(
-					PETER_BOÄH,
-					TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
-			KeyValue.pair(
-					PETER_WELT,
-					TestOutputWordCounter.of(PETER, WORD_WELT,2)),
-			KeyValue.pair(
-					PETER_BOÄH,
-					TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
-			KeyValue.pair(
-					KLAUS_S,
-					TestOutputWordCounter.of(KLAUS, WORD_S,2)),
-			KeyValue.pair(
-					PETER_BOÄH,
-					TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
-			KeyValue.pair(
-					KLAUS_S,
-					TestOutputWordCounter.of(KLAUS, WORD_S,3)),
-	};
-
-	static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
-	{
-		MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
-		Stream
-				.of(EXPECTED_MESSAGES)
-				.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
-		return expectedMessages;
-	}
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
new file mode 100644
index 0000000..334cd05
--- /dev/null
+++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
@@ -0,0 +1,165 @@
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+		properties = {
+				"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+				"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+				"spring.kafka.producer.properties.spring.json.add.type.headers=false",
+				"spring.kafka.consumer.auto-offset-reset=earliest",
+				"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+				"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+				"spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+				"logging.level.root=WARN",
+				"logging.level.de.juplo=DEBUG",
+				"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
+				"juplo.wordcount.counter.commit-interval=0",
+				"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
+				"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
+@Slf4j
+public class CounterApplicationIT
+{
+	public static final String TOPIC_IN = "in";
+	public static final String TOPIC_OUT = "out";
+
+	@Autowired
+	Consumer consumer;
+	@Autowired
+	CounterStreamProcessor streamProcessor;
+
+
+	@BeforeAll
+	public static void testSendMessage(
+			@Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
+	{
+		TestData
+				.getInputMessages()
+				.forEach(kv ->
+				{
+					try
+					{
+						SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+						log.info(
+								"Sent: {}={}, partition={}, offset={}",
+								result.getProducerRecord().key(),
+								result.getProducerRecord().value(),
+								result.getRecordMetadata().partition(),
+								result.getRecordMetadata().offset());
+					}
+					catch (Exception e)
+					{
+						throw new RuntimeException(e);
+					}
+				});
+	}
+
+	@DisplayName("Await the expected number of messages")
+	@Test
+	public void testAwaitExpectedNumberOfMessagesForUsers()
+	{
+		await("Expected number of messages")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() -> consumer.enforceAssertion(
+						receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
+	}
+
+	@DisplayName("Await the expected output messages")
+	@Test
+	void testSendMessage()
+	{
+		await("Expected messages")
+				.atMost(Duration.ofSeconds(10))
+				.untilAsserted(() -> consumer.enforceAssertion(
+						receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
+	}
+
+	@DisplayName("Await the expected final output messages")
+	@Test
+	public void testAwaitExpectedLastMessagesForUsers()
+	{
+		await("Expected final output messages")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() -> consumer.enforceAssertion(
+						receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
+	}
+
+	@DisplayName("Await the expected state in the state-store")
+	@Test
+	public void testAwaitExpectedState()
+	{
+		await("Expected state")
+				.atMost(Duration.ofSeconds(5))
+				.untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+	}
+
+
+	static class Consumer
+	{
+		private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
+
+		@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+		public synchronized void receive(
+				@Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+				@Payload TestOutputWordCounter counter)
+		{
+			log.debug("Received message: {} -> {}", word, counter);
+			received.add(word, counter);
+		}
+
+		synchronized void enforceAssertion(
+				java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
+		{
+			assertion.accept(received);
+		}
+	}
+
+	@TestConfiguration
+	static class Configuration
+	{
+		@Bean
+		Consumer consumer()
+		{
+			return new Consumer();
+		}
+
+		@Primary
+		@Bean
+		KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+		{
+			return Stores.inMemoryKeyValueStore(STORE_NAME);
+		}
+	}
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
new file mode 100644
index 0000000..0ffd516
--- /dev/null
+++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
@@ -0,0 +1,90 @@
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+
+
+@Slf4j
+public class CounterStreamProcessorTopologyTest
+{
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<TestInputUser, TestInputWord> in;
+  TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
+
+
+  @BeforeEach
+  public void setUpTestDriver()
+  {
+    Topology topology = CounterStreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore(STORE_NAME));
+
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+    in = testDriver.createInputTopic(
+        IN,
+        new JsonSerializer().noTypeInfo(),
+        new JsonSerializer().noTypeInfo());
+
+    out = testDriver.createOutputTopic(
+        OUT,
+        new JsonDeserializer()
+            .copyWithType(TestOutputWord.class)
+            .ignoreTypeHeaders(),
+        new JsonDeserializer()
+            .copyWithType(TestOutputWordCounter.class)
+            .ignoreTypeHeaders());
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getInputMessages()
+        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+    MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+    out
+        .readRecordsToList()
+        .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+    TestData.assertExpectedMessages(receivedMessages);
+
+    TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
+    TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
new file mode 100644
index 0000000..1ecfdbd
--- /dev/null
+++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
@@ -0,0 +1,206 @@
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+	static final String PETER = "peter";
+	static final String KLAUS = "klaus";
+
+	static final String WORD_HALLO = "Hallo";
+	static final String WORD_MÜSCH = "Müsch";
+	static final String WORD_WELT = "Welt";
+	static final String WORD_S = "s";
+	static final String WORD_BOÄH = "Boäh";
+
+	static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
+	static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
+	static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
+	static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
+	static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+
+	private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+	{
+			new KeyValue<>(
+					TestInputUser.of(PETER),
+					TestInputWord.of(PETER, WORD_HALLO)),
+			new KeyValue<>(
+					TestInputUser.of(KLAUS),
+					TestInputWord.of(KLAUS, WORD_MÜSCH)),
+			new KeyValue<>(
+					TestInputUser.of(PETER),
+					TestInputWord.of(PETER, WORD_WELT)),
+			new KeyValue<>(
+					TestInputUser.of(KLAUS),
+					TestInputWord.of(KLAUS, WORD_MÜSCH)),
+			new KeyValue<>(
+					TestInputUser.of(KLAUS),
+					TestInputWord.of(KLAUS, WORD_S)),
+			new KeyValue<>(
+					TestInputUser.of(PETER),
+					TestInputWord.of(PETER, WORD_BOÄH)),
+			new KeyValue<>(
+					TestInputUser.of(PETER),
+					TestInputWord.of(PETER, WORD_WELT)),
+			new KeyValue<>(
+					TestInputUser.of(PETER),
+					TestInputWord.of(PETER, WORD_BOÄH)),
+			new KeyValue<>(
+					TestInputUser.of(KLAUS),
+					TestInputWord.of(KLAUS, WORD_S)),
+			new KeyValue<>(
+					TestInputUser.of(PETER),
+					TestInputWord.of(PETER, WORD_BOÄH)),
+			new KeyValue<>(
+					TestInputUser.of(KLAUS),
+					TestInputWord.of(KLAUS, WORD_S)),
+	};
+
+	static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
+	{
+		return Stream.of(TestData.INPUT_MESSAGES);
+	}
+
+	static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+	{
+		expectedMessages().forEach(
+				(word, counter) ->
+						assertThat(receivedMessages.get(word))
+								.containsExactlyElementsOf(counter));
+	}
+
+	static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+	{
+		assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
+		assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
+		assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
+		assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
+		assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+	}
+
+	private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
+	{
+		return messagesForUsers.get(word) == null
+				? 0
+				: messagesForUsers.get(word).size();
+	}
+
+	static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+	{
+		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+	}
+
+	private static Word wordOf(TestOutputWord testOutputWord)
+	{
+		Word word = new Word();
+
+		word.setUser(testOutputWord.getUser());
+		word.setWord(testOutputWord.getWord());
+
+		return word;
+	}
+
+	static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+	{
+		assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
+		assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+	}
+
+	private static void assertWordCountEqualsWordCountFromLastMessage(
+			TestOutputWord word,
+			Long counter)
+	{
+		TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+				word.getUser(),
+				word.getWord(),
+				counter);
+		assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+	}
+
+	private static void assertWordCountEqualsWordCountFromLastMessage(
+			TestOutputWord word,
+			TestOutputWordCounter counter)
+	{
+		assertThat(counter).isEqualTo(getLastMessageFor(word));
+	}
+
+	private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+	{
+		return getLastMessageFor(word, expectedMessages());
+	}
+
+	private static TestOutputWordCounter getLastMessageFor(
+			TestOutputWord user,
+			MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+	{
+		return messagesForWord
+				.get(user)
+				.stream()
+				.reduce(null, (left, right) -> right);
+	}
+
+	private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+	{
+			KeyValue.pair(
+					PETER_HALLO,
+					TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
+			KeyValue.pair(
+					KLAUS_MÜSCH,
+					TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+			KeyValue.pair(
+					PETER_WELT,
+					TestOutputWordCounter.of(PETER, WORD_WELT,1)),
+			KeyValue.pair(
+					KLAUS_MÜSCH,
+					TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+			KeyValue.pair(
+					KLAUS_S,
+					TestOutputWordCounter.of(KLAUS, WORD_S,1)),
+			KeyValue.pair(
+					PETER_BOÄH,
+					TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
+			KeyValue.pair(
+					PETER_WELT,
+					TestOutputWordCounter.of(PETER, WORD_WELT,2)),
+			KeyValue.pair(
+					PETER_BOÄH,
+					TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
+			KeyValue.pair(
+					KLAUS_S,
+					TestOutputWordCounter.of(KLAUS, WORD_S,2)),
+			KeyValue.pair(
+					PETER_BOÄH,
+					TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
+			KeyValue.pair(
+					KLAUS_S,
+					TestOutputWordCounter.of(KLAUS, WORD_S,3)),
+	};
+
+	static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
+	{
+		MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
+		Stream
+				.of(EXPECTED_MESSAGES)
+				.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+		return expectedMessages;
+	}
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java
deleted file mode 100644
index cfc2cae..0000000
--- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWord
-{
-  String user;
-  String word;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java
deleted file mode 100644
index 1b59387..0000000
--- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
-{
-  String user;
-  String word;
-  long counter;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java
new file mode 100644
index 0000000..cfc2cae
--- /dev/null
+++ b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java
@@ -0,0 +1,15 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWord
+{
+  String user;
+  String word;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java
new file mode 100644
index 0000000..1b59387
--- /dev/null
+++ b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java
@@ -0,0 +1,16 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWordCounter
+{
+  String user;
+  String word;
+  long counter;
+}