popular: 1.0.0 - Renamed packages and classes -- MOVE
authorKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 21:19:02 +0000 (23:19 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 21:46:14 +0000 (23:46 +0200)
24 files changed:
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/User.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/Word.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/User.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/Word.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
deleted file mode 100644 (file)
index e6d3b1f..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class CounterApplication
-{
-       public static void main(String[] args)
-       {
-               SpringApplication.run(CounterApplication.class, args);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
deleted file mode 100644 (file)
index 484b8de..0000000
+++ /dev/null
@@ -1,97 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
-@Slf4j
-public class CounterApplicationConfiguriation
-{
-       @Bean
-       public Properties streamProcessorProperties(
-                       CounterApplicationProperties counterProperties)
-       {
-               Properties propertyMap = serializationConfig();
-
-               propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
-
-               propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
-               if (counterProperties.getCommitInterval() != null)
-                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
-               if (counterProperties.getCacheMaxBytes() != null)
-                       propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
-
-               propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-               return propertyMap;
-       }
-
-       static Properties serializationConfig()
-       {
-               Properties propertyMap = new Properties();
-
-               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
-               propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
-               propertyMap.put(
-                               JsonDeserializer.TYPE_MAPPINGS,
-                               "user:" + User.class.getName() + "," +
-                               "word:" + Word.class.getName() + "," +
-                               "counter:" + WordCounter.class.getName());
-
-               return propertyMap;
-       }
-
-       @Bean(initMethod = "start", destroyMethod = "stop")
-       public CounterStreamProcessor streamProcessor(
-                       CounterApplicationProperties applicationProperties,
-                       Properties streamProcessorProperties,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ConfigurableApplicationContext context)
-       {
-               CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
-                               applicationProperties.getInputTopic(),
-                               applicationProperties.getOutputTopic(),
-                               streamProcessorProperties,
-                               storeSupplier);
-
-               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
-               {
-                       log.error("Unexpected error!", e);
-                       CompletableFuture.runAsync(() ->
-                       {
-                               log.info("Stopping application...");
-                               SpringApplication.exit(context, () -> 1);
-                       });
-                       return SHUTDOWN_CLIENT;
-               });
-
-
-               return streamProcessor;
-       }
-
-       @Bean
-       public KeyValueBytesStoreSupplier storeSupplier()
-       {
-               return Stores.persistentKeyValueStore(STORE_NAME);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
deleted file mode 100644 (file)
index c3ada17..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.counter")
-@Getter
-@Setter
-@ToString
-public class CounterApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String applicationId = "counter";
-  private String inputTopic = "words";
-  private String outputTopic = "countings";
-  private Integer commitInterval;
-  private Integer cacheMaxBytes;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
deleted file mode 100644 (file)
index 64bd619..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-
-
-@Slf4j
-public class CounterStreamProcessor
-{
-       public static final String STORE_NAME = "counter";
-
-
-       public final KafkaStreams streams;
-
-
-       public CounterStreamProcessor(
-                       String inputTopic,
-                       String outputTopic,
-                       Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier)
-       {
-               Topology topology = CounterStreamProcessor.buildTopology(
-                               inputTopic,
-                               outputTopic,
-                               storeSupplier);
-
-               streams = new KafkaStreams(topology, properties);
-       }
-
-       static Topology buildTopology(
-                       String inputTopic,
-                       String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
-       {
-               StreamsBuilder builder = new StreamsBuilder();
-
-               KStream<User, Word> source = builder.stream(inputTopic);
-
-               source
-                               .map((key, word) -> new KeyValue<>(word, word))
-                               .groupByKey()
-                               .count(
-                                               Materialized
-                                                               .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
-                               .toStream()
-                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
-                               .to(outputTopic);
-
-               Topology topology = builder.build();
-               log.info("\n\n{}", topology.describe());
-
-               return topology;
-       }
-
-       ReadOnlyKeyValueStore<Word, Long> getStore()
-       {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
-       }
-
-       public void start()
-       {
-               log.info("Starting Stream-Processor");
-               streams.start();
-       }
-
-       public void stop()
-       {
-               log.info("Stopping Stream-Processor");
-               streams.close();
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java
deleted file mode 100644 (file)
index e38bcba..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class User
-{
-  String user;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
deleted file mode 100644 (file)
index 77287d5..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Word
-{
-  private String user;
-  private String word;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
deleted file mode 100644 (file)
index f1fce71..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class WordCounter
-{
-  String user;
-  String word;
-  long counter;
-
-  public static WordCounter of(Word word, long counter)
-  {
-    return new WordCounter(word.getUser(), word.getWord(), counter);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java
new file mode 100644 (file)
index 0000000..e6d3b1f
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.counter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class CounterApplication
+{
+       public static void main(String[] args)
+       {
+               SpringApplication.run(CounterApplication.class, args);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java
new file mode 100644 (file)
index 0000000..484b8de
--- /dev/null
@@ -0,0 +1,97 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(CounterApplicationProperties.class)
+@Slf4j
+public class CounterApplicationConfiguriation
+{
+       @Bean
+       public Properties streamProcessorProperties(
+                       CounterApplicationProperties counterProperties)
+       {
+               Properties propertyMap = serializationConfig();
+
+               propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
+
+               propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
+               if (counterProperties.getCommitInterval() != null)
+                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+               if (counterProperties.getCacheMaxBytes() != null)
+                       propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
+
+               propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+               return propertyMap;
+       }
+
+       static Properties serializationConfig()
+       {
+               Properties propertyMap = new Properties();
+
+               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+               propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+               propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName());
+               propertyMap.put(
+                               JsonDeserializer.TYPE_MAPPINGS,
+                               "user:" + User.class.getName() + "," +
+                               "word:" + Word.class.getName() + "," +
+                               "counter:" + WordCounter.class.getName());
+
+               return propertyMap;
+       }
+
+       @Bean(initMethod = "start", destroyMethod = "stop")
+       public CounterStreamProcessor streamProcessor(
+                       CounterApplicationProperties applicationProperties,
+                       Properties streamProcessorProperties,
+                       KeyValueBytesStoreSupplier storeSupplier,
+                       ConfigurableApplicationContext context)
+       {
+               CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+                               applicationProperties.getInputTopic(),
+                               applicationProperties.getOutputTopic(),
+                               streamProcessorProperties,
+                               storeSupplier);
+
+               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+               {
+                       log.error("Unexpected error!", e);
+                       CompletableFuture.runAsync(() ->
+                       {
+                               log.info("Stopping application...");
+                               SpringApplication.exit(context, () -> 1);
+                       });
+                       return SHUTDOWN_CLIENT;
+               });
+
+
+               return streamProcessor;
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier storeSupplier()
+       {
+               return Stores.persistentKeyValueStore(STORE_NAME);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java
new file mode 100644 (file)
index 0000000..c3ada17
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.counter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.counter")
+@Getter
+@Setter
+@ToString
+public class CounterApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String applicationId = "counter";
+  private String inputTopic = "words";
+  private String outputTopic = "countings";
+  private Integer commitInterval;
+  private Integer cacheMaxBytes;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
new file mode 100644 (file)
index 0000000..64bd619
--- /dev/null
@@ -0,0 +1,80 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+
+
+@Slf4j
+public class CounterStreamProcessor
+{
+       public static final String STORE_NAME = "counter";
+
+
+       public final KafkaStreams streams;
+
+
+       public CounterStreamProcessor(
+                       String inputTopic,
+                       String outputTopic,
+                       Properties properties,
+                       KeyValueBytesStoreSupplier storeSupplier)
+       {
+               Topology topology = CounterStreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic,
+                               storeSupplier);
+
+               streams = new KafkaStreams(topology, properties);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier)
+       {
+               StreamsBuilder builder = new StreamsBuilder();
+
+               KStream<User, Word> source = builder.stream(inputTopic);
+
+               source
+                               .map((key, word) -> new KeyValue<>(word, word))
+                               .groupByKey()
+                               .count(
+                                               Materialized
+                                                               .<Word, Long>as(storeSupplier)
+                                                               .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
+                               .toStream()
+                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
+                               .to(outputTopic);
+
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
+
+               return topology;
+       }
+
+       ReadOnlyKeyValueStore<Word, Long> getStore()
+       {
+               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+       }
+
+       public void start()
+       {
+               log.info("Starting Stream-Processor");
+               streams.start();
+       }
+
+       public void stop()
+       {
+               log.info("Stopping Stream-Processor");
+               streams.close();
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/User.java b/src/main/java/de/juplo/kafka/wordcount/popular/User.java
new file mode 100644 (file)
index 0000000..e38bcba
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+  String user;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java
new file mode 100644 (file)
index 0000000..77287d5
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java
new file mode 100644 (file)
index 0000000..f1fce71
--- /dev/null
@@ -0,0 +1,22 @@
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class WordCounter
+{
+  String user;
+  String word;
+  long counter;
+
+  public static WordCounter of(Word word, long counter)
+  {
+    return new WordCounter(word.getUser(), word.getWord(), counter);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
deleted file mode 100644 (file)
index 334cd05..0000000
+++ /dev/null
@@ -1,165 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
-               properties = {
-                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.add.type.headers=false",
-                               "spring.kafka.consumer.auto-offset-reset=earliest",
-                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
-                               "logging.level.root=WARN",
-                               "logging.level.de.juplo=DEBUG",
-                               "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.counter.commit-interval=0",
-                               "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
-                               "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
-@Slf4j
-public class CounterApplicationIT
-{
-       public static final String TOPIC_IN = "in";
-       public static final String TOPIC_OUT = "out";
-
-       @Autowired
-       Consumer consumer;
-       @Autowired
-       CounterStreamProcessor streamProcessor;
-
-
-       @BeforeAll
-       public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
-       {
-               TestData
-                               .getInputMessages()
-                               .forEach(kv ->
-                               {
-                                       try
-                                       {
-                                               SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
-                                               log.info(
-                                                               "Sent: {}={}, partition={}, offset={}",
-                                                               result.getProducerRecord().key(),
-                                                               result.getProducerRecord().value(),
-                                                               result.getRecordMetadata().partition(),
-                                                               result.getRecordMetadata().offset());
-                                       }
-                                       catch (Exception e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               });
-       }
-
-       @DisplayName("Await the expected number of messages")
-       @Test
-       public void testAwaitExpectedNumberOfMessagesForUsers()
-       {
-               await("Expected number of messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected output messages")
-       @Test
-       void testSendMessage()
-       {
-               await("Expected messages")
-                               .atMost(Duration.ofSeconds(10))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected final output messages")
-       @Test
-       public void testAwaitExpectedLastMessagesForUsers()
-       {
-               await("Expected final output messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected state in the state-store")
-       @Test
-       public void testAwaitExpectedState()
-       {
-               await("Expected state")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
-       }
-
-
-       static class Consumer
-       {
-               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
-
-               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
-                               @Payload TestOutputWordCounter counter)
-               {
-                       log.debug("Received message: {} -> {}", word, counter);
-                       received.add(word, counter);
-               }
-
-               synchronized void enforceAssertion(
-                               java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
-               {
-                       assertion.accept(received);
-               }
-       }
-
-       @TestConfiguration
-       static class Configuration
-       {
-               @Bean
-               Consumer consumer()
-               {
-                       return new Consumer();
-               }
-
-               @Primary
-               @Bean
-               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
-               {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
deleted file mode 100644 (file)
index 0ffd516..0000000
+++ /dev/null
@@ -1,90 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-
-
-@Slf4j
-public class CounterStreamProcessorTopologyTest
-{
-  public static final String IN = "TEST-IN";
-  public static final String OUT = "TEST-OUT";
-  public static final String STORE_NAME = "TOPOLOGY-TEST";
-
-
-  TopologyTestDriver testDriver;
-  TestInputTopic<TestInputUser, TestInputWord> in;
-  TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
-
-
-  @BeforeEach
-  public void setUpTestDriver()
-  {
-    Topology topology = CounterStreamProcessor.buildTopology(
-        IN,
-        OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
-
-    testDriver = new TopologyTestDriver(topology, serializationConfig());
-
-    in = testDriver.createInputTopic(
-        IN,
-        new JsonSerializer().noTypeInfo(),
-        new JsonSerializer().noTypeInfo());
-
-    out = testDriver.createOutputTopic(
-        OUT,
-        new JsonDeserializer()
-            .copyWithType(TestOutputWord.class)
-            .ignoreTypeHeaders(),
-        new JsonDeserializer()
-            .copyWithType(TestOutputWordCounter.class)
-            .ignoreTypeHeaders());
-  }
-
-
-  @Test
-  public void test()
-  {
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
-    MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
-    out
-        .readRecordsToList()
-        .forEach(record -> receivedMessages.add(record.key(), record.value()));
-
-    TestData.assertExpectedMessages(receivedMessages);
-
-    TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
-    TestData.assertExpectedLastMessagesForWord(receivedMessages);
-
-    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
-    TestData.assertExpectedState(store);
-  }
-
-  @AfterEach
-  public void tearDown()
-  {
-    testDriver.close();
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
deleted file mode 100644 (file)
index 1ecfdbd..0000000
+++ /dev/null
@@ -1,206 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
-       static final String PETER = "peter";
-       static final String KLAUS = "klaus";
-
-       static final String WORD_HALLO = "Hallo";
-       static final String WORD_MÜSCH = "Müsch";
-       static final String WORD_WELT = "Welt";
-       static final String WORD_S = "s";
-       static final String WORD_BOÄH = "Boäh";
-
-       static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
-       static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
-       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
-       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
-       static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
-
-       private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
-       {
-                       new KeyValue<>(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_HALLO)),
-                       new KeyValue<>(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
-                       new KeyValue<>(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_WELT)),
-                       new KeyValue<>(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
-                       new KeyValue<>(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
-                       new KeyValue<>(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
-                       new KeyValue<>(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_WELT)),
-                       new KeyValue<>(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
-                       new KeyValue<>(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
-                       new KeyValue<>(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
-                       new KeyValue<>(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
-       };
-
-       static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
-       {
-               return Stream.of(TestData.INPUT_MESSAGES);
-       }
-
-       static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-       {
-               expectedMessages().forEach(
-                               (word, counter) ->
-                                               assertThat(receivedMessages.get(word))
-                                                               .containsExactlyElementsOf(counter));
-       }
-
-       static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-       {
-               assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
-               assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
-               assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
-       }
-
-       private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
-       {
-               return messagesForUsers.get(word) == null
-                               ? 0
-                               : messagesForUsers.get(word).size();
-       }
-
-       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
-       {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
-       }
-
-       private static Word wordOf(TestOutputWord testOutputWord)
-       {
-               Word word = new Word();
-
-               word.setUser(testOutputWord.getUser());
-               word.setWord(testOutputWord.getWord());
-
-               return word;
-       }
-
-       static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-       {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
-       }
-
-       private static void assertWordCountEqualsWordCountFromLastMessage(
-                       TestOutputWord word,
-                       Long counter)
-       {
-               TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
-                               word.getUser(),
-                               word.getWord(),
-                               counter);
-               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
-       }
-
-       private static void assertWordCountEqualsWordCountFromLastMessage(
-                       TestOutputWord word,
-                       TestOutputWordCounter counter)
-       {
-               assertThat(counter).isEqualTo(getLastMessageFor(word));
-       }
-
-       private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
-       {
-               return getLastMessageFor(word, expectedMessages());
-       }
-
-       private static TestOutputWordCounter getLastMessageFor(
-                       TestOutputWord user,
-                       MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
-       {
-               return messagesForWord
-                               .get(user)
-                               .stream()
-                               .reduce(null, (left, right) -> right);
-       }
-
-       private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
-       {
-                       KeyValue.pair(
-                                       PETER_HALLO,
-                                       TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
-                       KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
-                       KeyValue.pair(
-                                       PETER_WELT,
-                                       TestOutputWordCounter.of(PETER, WORD_WELT,1)),
-                       KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
-                       KeyValue.pair(
-                                       KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,1)),
-                       KeyValue.pair(
-                                       PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
-                       KeyValue.pair(
-                                       PETER_WELT,
-                                       TestOutputWordCounter.of(PETER, WORD_WELT,2)),
-                       KeyValue.pair(
-                                       PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
-                       KeyValue.pair(
-                                       KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,2)),
-                       KeyValue.pair(
-                                       PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
-                       KeyValue.pair(
-                                       KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,3)),
-       };
-
-       static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
-       {
-               MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
-               Stream
-                               .of(EXPECTED_MESSAGES)
-                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
-               return expectedMessages;
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
new file mode 100644 (file)
index 0000000..334cd05
--- /dev/null
@@ -0,0 +1,165 @@
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+                               "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+                               "spring.kafka.consumer.auto-offset-reset=earliest",
+                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+                               "logging.level.root=WARN",
+                               "logging.level.de.juplo=DEBUG",
+                               "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.counter.commit-interval=0",
+                               "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
+                               "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
+@Slf4j
+public class CounterApplicationIT
+{
+       public static final String TOPIC_IN = "in";
+       public static final String TOPIC_OUT = "out";
+
+       @Autowired
+       Consumer consumer;
+       @Autowired
+       CounterStreamProcessor streamProcessor;
+
+
+       @BeforeAll
+       public static void testSendMessage(
+                       @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
+       {
+               TestData
+                               .getInputMessages()
+                               .forEach(kv ->
+                               {
+                                       try
+                                       {
+                                               SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               log.info(
+                                                               "Sent: {}={}, partition={}, offset={}",
+                                                               result.getProducerRecord().key(),
+                                                               result.getProducerRecord().value(),
+                                                               result.getRecordMetadata().partition(),
+                                                               result.getRecordMetadata().offset());
+                                       }
+                                       catch (Exception e)
+                                       {
+                                               throw new RuntimeException(e);
+                                       }
+                               });
+       }
+
+       @DisplayName("Await the expected number of messages")
+       @Test
+       public void testAwaitExpectedNumberOfMessagesForUsers()
+       {
+               await("Expected number of messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> consumer.enforceAssertion(
+                                               receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
+       }
+
+       @DisplayName("Await the expected output messages")
+       @Test
+       void testSendMessage()
+       {
+               await("Expected messages")
+                               .atMost(Duration.ofSeconds(10))
+                               .untilAsserted(() -> consumer.enforceAssertion(
+                                               receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
+       }
+
+       @DisplayName("Await the expected final output messages")
+       @Test
+       public void testAwaitExpectedLastMessagesForUsers()
+       {
+               await("Expected final output messages")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> consumer.enforceAssertion(
+                                               receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
+       }
+
+       @DisplayName("Await the expected state in the state-store")
+       @Test
+       public void testAwaitExpectedState()
+       {
+               await("Expected state")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+       }
+
+
+       static class Consumer
+       {
+               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public synchronized void receive(
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+                               @Payload TestOutputWordCounter counter)
+               {
+                       log.debug("Received message: {} -> {}", word, counter);
+                       received.add(word, counter);
+               }
+
+               synchronized void enforceAssertion(
+                               java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
+               {
+                       assertion.accept(received);
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+
+               @Primary
+               @Bean
+               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+               }
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..0ffd516
--- /dev/null
@@ -0,0 +1,90 @@
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+
+
+@Slf4j
+public class CounterStreamProcessorTopologyTest
+{
+  public static final String IN = "TEST-IN";
+  public static final String OUT = "TEST-OUT";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<TestInputUser, TestInputWord> in;
+  TestOutputTopic<TestOutputWord, TestOutputWordCounter> out;
+
+
+  @BeforeEach
+  public void setUpTestDriver()
+  {
+    Topology topology = CounterStreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore(STORE_NAME));
+
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+    in = testDriver.createInputTopic(
+        IN,
+        new JsonSerializer().noTypeInfo(),
+        new JsonSerializer().noTypeInfo());
+
+    out = testDriver.createOutputTopic(
+        OUT,
+        new JsonDeserializer()
+            .copyWithType(TestOutputWord.class)
+            .ignoreTypeHeaders(),
+        new JsonDeserializer()
+            .copyWithType(TestOutputWordCounter.class)
+            .ignoreTypeHeaders());
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getInputMessages()
+        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+    MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+    out
+        .readRecordsToList()
+        .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+    TestData.assertExpectedMessages(receivedMessages);
+
+    TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
+    TestData.assertExpectedLastMessagesForWord(receivedMessages);
+
+    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
new file mode 100644 (file)
index 0000000..1ecfdbd
--- /dev/null
@@ -0,0 +1,206 @@
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+       static final String PETER = "peter";
+       static final String KLAUS = "klaus";
+
+       static final String WORD_HALLO = "Hallo";
+       static final String WORD_MÜSCH = "Müsch";
+       static final String WORD_WELT = "Welt";
+       static final String WORD_S = "s";
+       static final String WORD_BOÄH = "Boäh";
+
+       static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
+       static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
+       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
+       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
+       static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+
+       private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+       {
+                       new KeyValue<>(
+                                       TestInputUser.of(PETER),
+                                       TestInputWord.of(PETER, WORD_HALLO)),
+                       new KeyValue<>(
+                                       TestInputUser.of(KLAUS),
+                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
+                       new KeyValue<>(
+                                       TestInputUser.of(PETER),
+                                       TestInputWord.of(PETER, WORD_WELT)),
+                       new KeyValue<>(
+                                       TestInputUser.of(KLAUS),
+                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
+                       new KeyValue<>(
+                                       TestInputUser.of(KLAUS),
+                                       TestInputWord.of(KLAUS, WORD_S)),
+                       new KeyValue<>(
+                                       TestInputUser.of(PETER),
+                                       TestInputWord.of(PETER, WORD_BOÄH)),
+                       new KeyValue<>(
+                                       TestInputUser.of(PETER),
+                                       TestInputWord.of(PETER, WORD_WELT)),
+                       new KeyValue<>(
+                                       TestInputUser.of(PETER),
+                                       TestInputWord.of(PETER, WORD_BOÄH)),
+                       new KeyValue<>(
+                                       TestInputUser.of(KLAUS),
+                                       TestInputWord.of(KLAUS, WORD_S)),
+                       new KeyValue<>(
+                                       TestInputUser.of(PETER),
+                                       TestInputWord.of(PETER, WORD_BOÄH)),
+                       new KeyValue<>(
+                                       TestInputUser.of(KLAUS),
+                                       TestInputWord.of(KLAUS, WORD_S)),
+       };
+
+       static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
+       {
+               return Stream.of(TestData.INPUT_MESSAGES);
+       }
+
+       static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+       {
+               expectedMessages().forEach(
+                               (word, counter) ->
+                                               assertThat(receivedMessages.get(word))
+                                                               .containsExactlyElementsOf(counter));
+       }
+
+       static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+       {
+               assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
+               assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
+               assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
+               assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
+               assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+       }
+
+       private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
+       {
+               return messagesForUsers.get(word) == null
+                               ? 0
+                               : messagesForUsers.get(word).size();
+       }
+
+       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+       {
+               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+       }
+
+       private static Word wordOf(TestOutputWord testOutputWord)
+       {
+               Word word = new Word();
+
+               word.setUser(testOutputWord.getUser());
+               word.setWord(testOutputWord.getWord());
+
+               return word;
+       }
+
+       static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+       {
+               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+       }
+
+       private static void assertWordCountEqualsWordCountFromLastMessage(
+                       TestOutputWord word,
+                       Long counter)
+       {
+               TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+                               word.getUser(),
+                               word.getWord(),
+                               counter);
+               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+       }
+
+       private static void assertWordCountEqualsWordCountFromLastMessage(
+                       TestOutputWord word,
+                       TestOutputWordCounter counter)
+       {
+               assertThat(counter).isEqualTo(getLastMessageFor(word));
+       }
+
+       private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+       {
+               return getLastMessageFor(word, expectedMessages());
+       }
+
+       private static TestOutputWordCounter getLastMessageFor(
+                       TestOutputWord user,
+                       MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+       {
+               return messagesForWord
+                               .get(user)
+                               .stream()
+                               .reduce(null, (left, right) -> right);
+       }
+
+       private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair(
+                                       PETER_HALLO,
+                                       TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
+                       KeyValue.pair(
+                                       KLAUS_MÜSCH,
+                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+                       KeyValue.pair(
+                                       PETER_WELT,
+                                       TestOutputWordCounter.of(PETER, WORD_WELT,1)),
+                       KeyValue.pair(
+                                       KLAUS_MÜSCH,
+                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+                       KeyValue.pair(
+                                       KLAUS_S,
+                                       TestOutputWordCounter.of(KLAUS, WORD_S,1)),
+                       KeyValue.pair(
+                                       PETER_BOÄH,
+                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
+                       KeyValue.pair(
+                                       PETER_WELT,
+                                       TestOutputWordCounter.of(PETER, WORD_WELT,2)),
+                       KeyValue.pair(
+                                       PETER_BOÄH,
+                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
+                       KeyValue.pair(
+                                       KLAUS_S,
+                                       TestOutputWordCounter.of(KLAUS, WORD_S,2)),
+                       KeyValue.pair(
+                                       PETER_BOÄH,
+                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
+                       KeyValue.pair(
+                                       KLAUS_S,
+                                       TestOutputWordCounter.of(KLAUS, WORD_S,3)),
+       };
+
+       static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
+       {
+               MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
+               Stream
+                               .of(EXPECTED_MESSAGES)
+                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+               return expectedMessages;
+       }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java
deleted file mode 100644 (file)
index cfc2cae..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWord
-{
-  String user;
-  String word;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java
deleted file mode 100644 (file)
index 1b59387..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
-{
-  String user;
-  String word;
-  long counter;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java
new file mode 100644 (file)
index 0000000..cfc2cae
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWord
+{
+  String user;
+  String word;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java
new file mode 100644 (file)
index 0000000..1b59387
--- /dev/null
@@ -0,0 +1,16 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWordCounter
+{
+  String user;
+  String word;
+  long counter;
+}