From: Kai Moritz Date: Sat, 22 Jun 2024 06:01:50 +0000 (+0200) Subject: stats: 1.0.0 - Renamed packages and classes -- MOVE X-Git-Tag: stats-on-top10~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=2eac018995c993857a825db1286dbe8ae2e8840e;p=demos%2Fkafka%2Fwordcount stats: 1.0.0 - Renamed packages and classes -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java new file mode 100644 index 0000000..b25fc07 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.top10; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Entry +{ + private String word; + private Long counter; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java new file mode 100644 index 0000000..ffac8ea --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.wordcount.top10; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; + + +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Key +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java new file mode 100644 index 0000000..4f56c18 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java @@ -0,0 +1,159 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.*; + +import java.util.*; + + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data +public class Ranking +{ + public final static int MAX_ENTRIES = 10; + + + private Entry[] entries = new Entry[0]; + + public Ranking add(Entry newEntry) + { + if (entries.length == 0) + { + entries = new Entry[1]; + entries[0] = newEntry; + return this; + } + + List list = new LinkedList<>(Arrays.asList(entries)); + int oldPosition = -1; + for (int i = 0; i < list.size(); i++) + { + Entry entry = list.get(i); + + if (entry.getCounter() < newEntry.getCounter()) + { + if (oldPosition > -1) + { + if (list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + else + { + // Entry for word already exists with the same counting! Nothing changed... + return this; + } + } + + list.add(i, newEntry); + for (int j = i+1; j < list.size(); j++) + { + entry = list.get(j); + if(entry.getWord().equals(newEntry.getWord())) + { + list.remove(j); + break; + } + } + if (list.size() > MAX_ENTRIES) + { + list = list.subList(0, MAX_ENTRIES); + } + entries = list.toArray(num -> new Entry[num]); + return this; + } + + if (entry.getWord().equals(newEntry.getWord())) + oldPosition = i; + } + + if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + + if (list.size() < MAX_ENTRIES) + { + list.add(newEntry); + entries = list.toArray(num -> new Entry[num]); + } + + return this; + } + + public Ranking validate() throws IllegalArgumentException + { + if (this.entries.length > MAX_ENTRIES) + throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES ); + + Set seenWords = new HashSet<>(); + long lowesCounting = Long.MAX_VALUE; + + for (int i=0; i " + entry.getWord()); + if (entry.getCounter() > lowesCounting) + throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); + + seenWords.add(entry.getWord()); + lowesCounting = entry.getCounter(); + } + + return this; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null) + return false; + if (!(o instanceof Ranking)) + return false; + + Ranking other = (Ranking)o; + + if (other.entries.length != entries.length) + return false; + + if (entries.length == 0) + return true; + + int i = 0; + Set myWordsWithCurrentCount = new HashSet<>(); + Set otherWordsWithCurrentCount = new HashSet<>(); + Entry myEntry = entries[i]; + long currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.add(myEntry.getWord()); + while (true) + { + Entry otherEntry = other.entries[i]; + if (otherEntry.getCounter() != currentCount) + return false; + otherWordsWithCurrentCount.add(otherEntry.getWord()); + if (++i >= entries.length) + return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); + myEntry = entries[i]; + if (myEntry.getCounter() != currentCount) + { + if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount)) + return false; + currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.clear(); + otherWordsWithCurrentCount.clear(); + } + myWordsWithCurrentCount.add(myEntry.getWord()); + } + } + + public static Ranking of(Entry... entries) + { + Ranking ranking = new Ranking(entries); + ranking.validate(); + return ranking; + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java new file mode 100644 index 0000000..5c14ae7 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class Top10Application +{ + public static void main(String[] args) + { + SpringApplication.run(Top10Application.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java new file mode 100644 index 0000000..255f0e4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java @@ -0,0 +1,98 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(Top10ApplicationProperties.class) +@Slf4j +public class Top10ApplicationConfiguration +{ + @Bean + public Properties streamProcessorProperties(Top10ApplicationProperties properties) + { + Properties props = new Properties(); + + props.putAll(serializationConfig()); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + + if (properties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + static Properties serializationConfig() + { + Properties props = new Properties(); + + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "word:" + Key.class.getName() + "," + + "counter:" + Entry.class.getName() + "," + + "user:" + User.class.getName() + "," + + "ranking:" + Ranking.class.getName()); + + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public Top10StreamProcessor streamProcessor( + Top10ApplicationProperties applicationProperties, + Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, + ConfigurableApplicationContext context) + { + Top10StreamProcessor streamProcessor = new Top10StreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + streamProcessorProperties, + storeSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore(STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java new file mode 100644 index 0000000..d3bb236 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.top10; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.top10") +@Getter +@Setter +@ToString +public class Top10ApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "top10"; + private String inputTopic = "countings"; + private String outputTopic = "top10"; + private Integer commitInterval; + private Integer cacheMaxBytes; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java new file mode 100644 index 0000000..70ead87 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java @@ -0,0 +1,75 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; + +import java.util.Properties; + + +@Slf4j +public class Top10StreamProcessor +{ + public static final String STORE_NAME= "top10"; + + public final KafkaStreams streams; + + + public Top10StreamProcessor( + String inputTopic, + String outputTopic, + Properties props, + KeyValueBytesStoreSupplier storeSupplier) + { + Topology topology = Top10StreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier); + + streams = new KafkaStreams(topology, props); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) + { + StreamsBuilder builder = new StreamsBuilder(); + + builder + .stream(inputTopic) + .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) + .groupByKey() + .aggregate( + () -> new Ranking(), + (user, entry, ranking) -> ranking.add(entry), + Materialized.as(storeSupplier)) + .toStream() + .to(outputTopic); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; + } + + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + } + + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java b/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java new file mode 100644 index 0000000..53c258d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class User +{ + String user; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java deleted file mode 100644 index b25fc07..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@NoArgsConstructor -@AllArgsConstructor( - staticName = "of", - access = AccessLevel.PACKAGE) -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class Entry -{ - private String word; - private Long counter; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java deleted file mode 100644 index ffac8ea..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.*; - - -@NoArgsConstructor -@AllArgsConstructor( - staticName = "of", - access = AccessLevel.PACKAGE) -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class Key -{ - private String user; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java deleted file mode 100644 index 4f56c18..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ /dev/null @@ -1,159 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.*; - -import java.util.*; - - -@AllArgsConstructor(access = AccessLevel.PRIVATE) -@NoArgsConstructor -@Data -public class Ranking -{ - public final static int MAX_ENTRIES = 10; - - - private Entry[] entries = new Entry[0]; - - public Ranking add(Entry newEntry) - { - if (entries.length == 0) - { - entries = new Entry[1]; - entries[0] = newEntry; - return this; - } - - List list = new LinkedList<>(Arrays.asList(entries)); - int oldPosition = -1; - for (int i = 0; i < list.size(); i++) - { - Entry entry = list.get(i); - - if (entry.getCounter() < newEntry.getCounter()) - { - if (oldPosition > -1) - { - if (list.get(oldPosition).getCounter() > newEntry.getCounter()) - { - throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); - } - else - { - // Entry for word already exists with the same counting! Nothing changed... - return this; - } - } - - list.add(i, newEntry); - for (int j = i+1; j < list.size(); j++) - { - entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) - { - list.remove(j); - break; - } - } - if (list.size() > MAX_ENTRIES) - { - list = list.subList(0, MAX_ENTRIES); - } - entries = list.toArray(num -> new Entry[num]); - return this; - } - - if (entry.getWord().equals(newEntry.getWord())) - oldPosition = i; - } - - if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter()) - { - throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); - } - - if (list.size() < MAX_ENTRIES) - { - list.add(newEntry); - entries = list.toArray(num -> new Entry[num]); - } - - return this; - } - - public Ranking validate() throws IllegalArgumentException - { - if (this.entries.length > MAX_ENTRIES) - throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES ); - - Set seenWords = new HashSet<>(); - long lowesCounting = Long.MAX_VALUE; - - for (int i=0; i " + entry.getWord()); - if (entry.getCounter() > lowesCounting) - throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); - - seenWords.add(entry.getWord()); - lowesCounting = entry.getCounter(); - } - - return this; - } - - @Override - public boolean equals(Object o) - { - if (this == o) - return true; - if (o == null) - return false; - if (!(o instanceof Ranking)) - return false; - - Ranking other = (Ranking)o; - - if (other.entries.length != entries.length) - return false; - - if (entries.length == 0) - return true; - - int i = 0; - Set myWordsWithCurrentCount = new HashSet<>(); - Set otherWordsWithCurrentCount = new HashSet<>(); - Entry myEntry = entries[i]; - long currentCount = myEntry.getCounter(); - myWordsWithCurrentCount.add(myEntry.getWord()); - while (true) - { - Entry otherEntry = other.entries[i]; - if (otherEntry.getCounter() != currentCount) - return false; - otherWordsWithCurrentCount.add(otherEntry.getWord()); - if (++i >= entries.length) - return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); - myEntry = entries[i]; - if (myEntry.getCounter() != currentCount) - { - if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount)) - return false; - currentCount = myEntry.getCounter(); - myWordsWithCurrentCount.clear(); - otherWordsWithCurrentCount.clear(); - } - myWordsWithCurrentCount.add(myEntry.getWord()); - } - } - - public static Ranking of(Entry... entries) - { - Ranking ranking = new Ranking(entries); - ranking.validate(); - return ranking; - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java deleted file mode 100644 index 5c14ae7..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class Top10Application -{ - public static void main(String[] args) - { - SpringApplication.run(Top10Application.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java deleted file mode 100644 index 255f0e4..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ /dev/null @@ -1,98 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(Top10ApplicationProperties.class) -@Slf4j -public class Top10ApplicationConfiguration -{ - @Bean - public Properties streamProcessorProperties(Top10ApplicationProperties properties) - { - Properties props = new Properties(); - - props.putAll(serializationConfig()); - - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - - if (properties.getCommitInterval() != null) - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); - if (properties.getCacheMaxBytes() != null) - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes()); - - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return props; - } - - static Properties serializationConfig() - { - Properties props = new Properties(); - - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); - props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); - props.put( - JsonDeserializer.TYPE_MAPPINGS, - "word:" + Key.class.getName() + "," + - "counter:" + Entry.class.getName() + "," + - "user:" + User.class.getName() + "," + - "ranking:" + Ranking.class.getName()); - - return props; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public Top10StreamProcessor streamProcessor( - Top10ApplicationProperties applicationProperties, - Properties streamProcessorProperties, - KeyValueBytesStoreSupplier storeSupplier, - ConfigurableApplicationContext context) - { - Top10StreamProcessor streamProcessor = new Top10StreamProcessor( - applicationProperties.getInputTopic(), - applicationProperties.getOutputTopic(), - streamProcessorProperties, - storeSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.persistentKeyValueStore(STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java deleted file mode 100644 index d3bb236..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.top10") -@Getter -@Setter -@ToString -public class Top10ApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "top10"; - private String inputTopic = "countings"; - private String outputTopic = "top10"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java deleted file mode 100644 index 70ead87..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ /dev/null @@ -1,75 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; - -import java.util.Properties; - - -@Slf4j -public class Top10StreamProcessor -{ - public static final String STORE_NAME= "top10"; - - public final KafkaStreams streams; - - - public Top10StreamProcessor( - String inputTopic, - String outputTopic, - Properties props, - KeyValueBytesStoreSupplier storeSupplier) - { - Topology topology = Top10StreamProcessor.buildTopology( - inputTopic, - outputTopic, - storeSupplier); - - streams = new KafkaStreams(topology, props); - } - - static Topology buildTopology( - String inputTopic, - String outputTopic, - KeyValueBytesStoreSupplier storeSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - builder - .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) - .groupByKey() - .aggregate( - () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry), - Materialized.as(storeSupplier)) - .toStream() - .to(outputTopic); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); - } - - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java deleted file mode 100644 index 53c258d..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/User.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -@Data -public class User -{ - String user; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java deleted file mode 100644 index d98ae64..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestCounter -{ - String user; - String word; - long counter; - - public static TestCounter of(TestWord word, long counter) - { - return new TestCounter(word.getUser(), word.getWord(), counter); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java deleted file mode 100644 index 8008e12..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class TestWord -{ - private String user; - private String word; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java b/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java new file mode 100644 index 0000000..d98ae64 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestCounter +{ + String user; + String word; + long counter; + + public static TestCounter of(TestWord word, long counter) + { + return new TestCounter(word.getUser(), word.getWord(), counter); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java b/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java new file mode 100644 index 0000000..8008e12 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class TestWord +{ + private String user; + private String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java new file mode 100644 index 0000000..a5152e6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String word; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java new file mode 100644 index 0000000..efad48b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.Entry; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data +public class TestRanking +{ + private TestEntry[] entries; + + public static TestRanking of(TestEntry... entries) + { + return new TestRanking(entries); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java new file mode 100644 index 0000000..53a5992 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java deleted file mode 100644 index a5152e6..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -@Data -public class TestEntry -{ - String word; - long counter; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java deleted file mode 100644 index efad48b..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import de.juplo.kafka.wordcount.top10.Entry; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@AllArgsConstructor(access = AccessLevel.PRIVATE) -@NoArgsConstructor -@Data -public class TestRanking -{ - private TestEntry[] entries; - - public static TestRanking of(TestEntry... entries) - { - return new TestRanking(entries); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java deleted file mode 100644 index 53a5992..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.query; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor -@Data -public class TestUser -{ - String user; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java new file mode 100644 index 0000000..26749e9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java @@ -0,0 +1,276 @@ +package de.juplo.kafka.wordcount.top10; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; + + +public class RankingTest +{ + @DisplayName("A newly created instance is empty") + @Test + public void testNewRankingIsEmpty() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from an empty ranking is empty") + @Test + public void testRankingOfYieldsExpectedResultForEmptyList() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from a valid ranking contains the expected entries") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testRankingOfYieldsExpectedResultsForValidRankings(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList); + } + + @DisplayName("The builder fails for invalid rankings") + @ParameterizedTest + @MethodSource("invalidRankingsProvider") + public void testRankingOfThrowsExceptionForInvalidRankings(List entryList) + { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> Ranking.of(toArray(entryList))); + } + + @DisplayName("Adding a new word with highest ranking, pushes all other words down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithHighestRanking(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(-1)); + ranking.add(newEntry); + assertThat(ranking.getEntries()[0]).isEqualTo(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + + @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithExistingRanking(List entryList) + { + for (int position = 0; position < entryList.size(); position++ ) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(position)); + ranking.add(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + if (i < position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + } + if (i == position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry); + } + if (i > position) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + } + } + + @DisplayName("Adding a highest ranking for an existing word shifts it to the first place") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithHighestRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + String word = wordForPosition(position); + Entry highestEntry = Entry.of(word, 100l); + ranking.add(highestEntry); + List expectedEntries = Stream + .concat( + Stream.of(highestEntry), + VALID_RANKINGS[0] + .stream() + .filter(entry -> !entry.getWord().equals(word))) + .toList(); + assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); + } + + @DisplayName("Adding an existing word with unchanged ranking changes nothing") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithUnchangedRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + Entry unchangedEntry = Entry.of( + wordForPosition(position), + rankingForPosition(position)); + ranking.add(unchangedEntry); + assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]); + } + + @DisplayName("Adding an existing word with a lower ranking fails") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingExistingWordWithLowerRankingFails(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + entryList.forEach(entry -> + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); + } + + @DisplayName("Identical rankings are considered equal") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testIdenticalRankingsAreConsideredEaqual(List entryList) + { + assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Two empty rankings are considered equal") + @Test + public void testTwoEmptyRankingsAreConsideredEaqual() + { + assertThat(Ranking.of()).isEqualTo(Ranking.of()); + } + + @DisplayName("A changed ranking is not considered equal to its unchanged counter-part") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testChangedRankingsDoNotEaqualUnchangedOne(List entryList) + { + Ranking changed = Ranking.of(toArray(entryList)); + changed.add(Entry.of("devilish", 666l)); + assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ") + @Test + public void testRankingWithDifferentOrderForSameCountAreEqual() + { + assertThat( + Ranking.of( + Entry.of("a1",10l), + Entry.of("a2",10l), + Entry.of("b1", 9l), + Entry.of("b2",9l), + Entry.of("c1", 8l), + Entry.of("c2", 8l))) + .isEqualTo(Ranking.of( + Entry.of("a2",10l), + Entry.of("a1",10l), + Entry.of("b2", 9l), + Entry.of("b1",9l), + Entry.of("c2", 8l), + Entry.of("c1", 8l))); + } + + + Entry[] toArray(List entryList) + { + return entryList.toArray(size -> new Entry[size]); + } + + static String wordForPosition(int position) + { + return Integer.toString(position+1); + } + + static long rankingForPosition(int position) + { + return (long)Ranking.MAX_ENTRIES * 2 - position; + } + + static Stream> validRankingsProvider() + { + return Stream.of(VALID_RANKINGS); + } + + static Stream> invalidRankingsProvider() + { + return Stream.of(INVALID_RANKINGS); + } + + static String[] WORDS = new String[Ranking.MAX_ENTRIES]; + static List[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES]; + + static + { + for (int i = 0; i < Ranking.MAX_ENTRIES; i++) + { + List ranking = new LinkedList<>(); + String word = null; + for (int position = 0; position <= i; position++) + { + word = wordForPosition(position); + Entry entry = Entry.of(word, rankingForPosition(position)); + ranking.add(entry); + } + WORDS[i] = word; + VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking; + } + } + + static List[] INVALID_RANKINGS = new List[] { + List.of( + Entry.of("Platz eins", 1l), + Entry.of("Platz zwei", 2l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 11l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 11111111l), + Entry.of("Platz zwei", 2222222l), + Entry.of("Platz fünf", 555555l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz eins", 111111l), + Entry.of("Platz sechs", 66666l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz sieben", 7777l), + Entry.of("Platz acht", 888l), + Entry.of("Platz neun", 99l), + Entry.of("Platz 10", 6l), + Entry.of("Platz 11", 3l))}; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java new file mode 100644 index 0000000..f5ef236 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java @@ -0,0 +1,168 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; + +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.top10.commit-interval=100", + "juplo.wordcount.top10.cacheMaxBytes=0", + "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, + "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) +@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) +@Slf4j +public class Top10ApplicationIT +{ + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; + + @Autowired + Consumer consumer; + @Autowired + Top10StreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + + @DisplayName("Await the expected output messages") + @Test + @Disabled + public void testAwaitExpectedMessages() + { + await("Expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages))); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, + @Payload TestRanking ranking) + { + log.debug("Received message: {} -> {}", user, ranking); + received.add(user, ranking); + } + + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) + { + assertion.accept(received); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java new file mode 100644 index 0000000..90d8e4c --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java @@ -0,0 +1,105 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; + +import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; + + +@Slf4j +public class Top10StreamProcessorTopologyTest +{ + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; + + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUp() + { + Topology topology = Top10StreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + in = testDriver.createInputTopic( + IN, + jsonSerializer(TestWord.class, true), + jsonSerializer(TestCounter.class,false)); + + out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer() + .copyWithType(TestUser.class) + .ignoreTypeHeaders(), + new JsonDeserializer() + .copyWithType(TestRanking.class) + .ignoreTypeHeaders()); + + } + + + @Test + public void test() + { + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + + TestData.assertExpectedMessages(receivedMessages); + + TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages); + TestData.assertExpectedLastMessagesForUsers(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "word:" + TestWord.class.getName() + "," + + "counter:" + TestCounter.class.getName()), + isKey); + return jsonSerializer; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java new file mode 100644 index 0000000..7a3a27e --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java @@ -0,0 +1,209 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestEntry; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); + + static final Stream> getInputMessages() + { + return Stream.of(INPUT_MESSAGES); + } + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + TestWord.of(PETER.getUser(),"Hallo"), + TestCounter.of(PETER.getUser(),"Hallo",1)), + new KeyValue<>( + TestWord.of(KLAUS.getUser(),"Müsch"), + TestCounter.of(KLAUS.getUser(),"Müsch",1)), + new KeyValue<>( + TestWord.of(PETER.getUser(),"Welt"), + TestCounter.of(PETER.getUser(),"Welt",1)), + new KeyValue<>( + TestWord.of(KLAUS.getUser(),"Müsch"), + TestCounter.of(KLAUS.getUser(),"Müsch",2)), + new KeyValue<>( + TestWord.of(KLAUS.getUser(),"s"), + TestCounter.of(KLAUS.getUser(),"s",1)), + new KeyValue<>( + TestWord.of(PETER.getUser(),"Boäh"), + TestCounter.of(PETER.getUser(),"Boäh",1)), + new KeyValue<>( + TestWord.of(PETER.getUser(),"Welt"), + TestCounter.of(PETER.getUser(),"Welt",2)), + new KeyValue<>( + TestWord.of(PETER.getUser(),"Boäh"), + TestCounter.of(PETER.getUser(),"Boäh",2)), + new KeyValue<>( + TestWord.of(KLAUS.getUser(),"s"), + TestCounter.of(KLAUS.getUser(),"s",2)), + new KeyValue<>( + TestWord.of(PETER.getUser(),"Boäh"), + TestCounter.of(PETER.getUser(),"Boäh",3)), + new KeyValue<>( + TestWord.of(KLAUS.getUser(),"s"), + TestCounter.of(KLAUS.getUser(),"s",3)), + }; + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + expectedMessages().forEach( + (user, rankings) -> + assertThat(receivedMessages.get(user)) + .containsExactlyElementsOf(rankings)); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); + } + + private static User userOf(TestUser user) + { + return User.of(user.getUser()); + } + + static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) + { + assertThat(countMessagesForUser(PETER, receivedMessages)); + assertThat(countMessagesForUser(KLAUS, receivedMessages)); + } + + private static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) + { + return messagesForUsers.get(user) == null + ? 0 + : messagesForUsers.get(user).size(); + } + + + static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) + { + assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); + assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); + } + + private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) + { + TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); + assertRankingEqualsRankingFromLastMessage(user, testRanking); + } + + private static TestEntry[] testEntriesOf(Entry... entries) + { + return Arrays + .stream(entries) + .map(entry -> TestEntry.of( + entry.getWord(), + entry.getCounter() == null + ? -1l + : entry.getCounter())) + .toArray(size -> new TestEntry[size]); + } + + private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) + { + assertThat(ranking).isEqualTo(getLastMessageFor(user)); + } + + private static TestRanking getLastMessageFor(TestUser user) + { + return getLastMessageFor(user, expectedMessages()); + } + + private static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) + { + return messagesForUsers + .get(user) + .stream() + .reduce(null, (left, right) -> right); + } + + private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 1 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 2 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), + KeyValue.pair( // 3 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l))), + KeyValue.pair( // 4 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), + KeyValue.pair( // 5 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 6 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 7 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 8 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), + KeyValue.pair( // 9 + PETER, + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 10 + KLAUS, + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), + }; + + private static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java deleted file mode 100644 index 26749e9..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java +++ /dev/null @@ -1,276 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.LinkedList; -import java.util.List; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; - - -public class RankingTest -{ - @DisplayName("A newly created instance is empty") - @Test - public void testNewRankingIsEmpty() - { - Ranking ranking = new Ranking(); - assertThat(ranking.getEntries()).isEmpty(); - } - - @DisplayName("An instance that was build from an empty ranking is empty") - @Test - public void testRankingOfYieldsExpectedResultForEmptyList() - { - Ranking ranking = new Ranking(); - assertThat(ranking.getEntries()).isEmpty(); - } - - @DisplayName("An instance that was build from a valid ranking contains the expected entries") - @ParameterizedTest - @MethodSource("validRankingsProvider") - public void testRankingOfYieldsExpectedResultsForValidRankings(List entryList) - { - Ranking ranking = Ranking.of(toArray(entryList)); - assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList); - } - - @DisplayName("The builder fails for invalid rankings") - @ParameterizedTest - @MethodSource("invalidRankingsProvider") - public void testRankingOfThrowsExceptionForInvalidRankings(List entryList) - { - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> Ranking.of(toArray(entryList))); - } - - @DisplayName("Adding a new word with highest ranking, pushes all other words down") - @ParameterizedTest - @MethodSource("validRankingsProvider") - public void testAddingNewWordWithHighestRanking(List entryList) - { - Ranking ranking = Ranking.of(toArray(entryList)); - Entry newEntry = Entry.of("NEW!", rankingForPosition(-1)); - ranking.add(newEntry); - assertThat(ranking.getEntries()[0]).isEqualTo(newEntry); - for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) - { - assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); - } - } - - @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down") - @ParameterizedTest - @MethodSource("validRankingsProvider") - public void testAddingNewWordWithExistingRanking(List entryList) - { - for (int position = 0; position < entryList.size(); position++ ) - { - Ranking ranking = Ranking.of(toArray(entryList)); - Entry newEntry = Entry.of("NEW!", rankingForPosition(position)); - ranking.add(newEntry); - for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) - { - if (i < position) - { - assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); - } - if (i == position) - { - assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); - assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry); - } - if (i > position) - { - assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); - } - } - } - } - - @DisplayName("Adding a highest ranking for an existing word shifts it to the first place") - @ParameterizedTest - @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) - public void testAddingExistingWordWithHighestRanking(int position) - { - Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); - String word = wordForPosition(position); - Entry highestEntry = Entry.of(word, 100l); - ranking.add(highestEntry); - List expectedEntries = Stream - .concat( - Stream.of(highestEntry), - VALID_RANKINGS[0] - .stream() - .filter(entry -> !entry.getWord().equals(word))) - .toList(); - assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); - } - - @DisplayName("Adding an existing word with unchanged ranking changes nothing") - @ParameterizedTest - @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) - public void testAddingExistingWordWithUnchangedRanking(int position) - { - Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); - Entry unchangedEntry = Entry.of( - wordForPosition(position), - rankingForPosition(position)); - ranking.add(unchangedEntry); - assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]); - } - - @DisplayName("Adding an existing word with a lower ranking fails") - @ParameterizedTest - @MethodSource("validRankingsProvider") - public void testAddingExistingWordWithLowerRankingFails(List entryList) - { - Ranking ranking = Ranking.of(toArray(entryList)); - entryList.forEach(entry -> - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); - } - - @DisplayName("Identical rankings are considered equal") - @ParameterizedTest - @MethodSource("validRankingsProvider") - public void testIdenticalRankingsAreConsideredEaqual(List entryList) - { - assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList))); - } - - @DisplayName("Two empty rankings are considered equal") - @Test - public void testTwoEmptyRankingsAreConsideredEaqual() - { - assertThat(Ranking.of()).isEqualTo(Ranking.of()); - } - - @DisplayName("A changed ranking is not considered equal to its unchanged counter-part") - @ParameterizedTest - @MethodSource("validRankingsProvider") - public void testChangedRankingsDoNotEaqualUnchangedOne(List entryList) - { - Ranking changed = Ranking.of(toArray(entryList)); - changed.add(Entry.of("devilish", 666l)); - assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList))); - } - - @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ") - @Test - public void testRankingWithDifferentOrderForSameCountAreEqual() - { - assertThat( - Ranking.of( - Entry.of("a1",10l), - Entry.of("a2",10l), - Entry.of("b1", 9l), - Entry.of("b2",9l), - Entry.of("c1", 8l), - Entry.of("c2", 8l))) - .isEqualTo(Ranking.of( - Entry.of("a2",10l), - Entry.of("a1",10l), - Entry.of("b2", 9l), - Entry.of("b1",9l), - Entry.of("c2", 8l), - Entry.of("c1", 8l))); - } - - - Entry[] toArray(List entryList) - { - return entryList.toArray(size -> new Entry[size]); - } - - static String wordForPosition(int position) - { - return Integer.toString(position+1); - } - - static long rankingForPosition(int position) - { - return (long)Ranking.MAX_ENTRIES * 2 - position; - } - - static Stream> validRankingsProvider() - { - return Stream.of(VALID_RANKINGS); - } - - static Stream> invalidRankingsProvider() - { - return Stream.of(INVALID_RANKINGS); - } - - static String[] WORDS = new String[Ranking.MAX_ENTRIES]; - static List[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES]; - - static - { - for (int i = 0; i < Ranking.MAX_ENTRIES; i++) - { - List ranking = new LinkedList<>(); - String word = null; - for (int position = 0; position <= i; position++) - { - word = wordForPosition(position); - Entry entry = Entry.of(word, rankingForPosition(position)); - ranking.add(entry); - } - WORDS[i] = word; - VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking; - } - } - - static List[] INVALID_RANKINGS = new List[] { - List.of( - Entry.of("Platz eins", 1l), - Entry.of("Platz zwei", 2l)), - List.of( - Entry.of("Platz eins", 1111111111l), - Entry.of("Platz zwei", 222222222l), - Entry.of("Platz eins", 1l)), - List.of( - Entry.of("Platz eins", 11l), - Entry.of("Platz eins", 1l)), - List.of( - Entry.of("Platz eins", 1111111111l), - Entry.of("Platz zwei", 222222222l), - Entry.of("Platz eins", 11111111l), - Entry.of("Platz zwei", 2222222l), - Entry.of("Platz fünf", 555555l)), - List.of( - Entry.of("Platz eins", 1111111111l), - Entry.of("Platz zwei", 222222222l), - Entry.of("Platz drei", 33333333l), - Entry.of("Platz vier", 4444444l), - Entry.of("Platz eins", 111111l), - Entry.of("Platz sechs", 66666l)), - List.of( - Entry.of("Platz eins", 1111111111l), - Entry.of("Platz zwei", 222222222l), - Entry.of("Platz drei", 33333333l), - Entry.of("Platz vier", 4444444l), - Entry.of("Platz fünf", 555555l), - Entry.of("Platz sechs", 66666l), - Entry.of("Platz eins", 1l)), - List.of( - Entry.of("Platz eins", 1111111111l), - Entry.of("Platz zwei", 222222222l), - Entry.of("Platz drei", 33333333l), - Entry.of("Platz vier", 4444444l), - Entry.of("Platz fünf", 555555l), - Entry.of("Platz sechs", 66666l), - Entry.of("Platz sieben", 7777l), - Entry.of("Platz acht", 888l), - Entry.of("Platz neun", 99l), - Entry.of("Platz 10", 6l), - Entry.of("Platz 11", 3l))}; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java deleted file mode 100644 index 7a3a27e..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ /dev/null @@ -1,209 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import de.juplo.kafka.wordcount.counter.TestCounter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.query.TestEntry; -import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.Arrays; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - - -class TestData -{ - static final TestUser PETER = TestUser.of("peter"); - static final TestUser KLAUS = TestUser.of("klaus"); - - static final Stream> getInputMessages() - { - return Stream.of(INPUT_MESSAGES); - } - - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] - { - new KeyValue<>( - TestWord.of(PETER.getUser(),"Hallo"), - TestCounter.of(PETER.getUser(),"Hallo",1)), - new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",1)), - new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",1)), - new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",2)), - new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",1)), - new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",1)), - new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",2)), - new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",2)), - new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",2)), - new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",3)), - new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",3)), - }; - - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - expectedMessages().forEach( - (user, rankings) -> - assertThat(receivedMessages.get(user)) - .containsExactlyElementsOf(rankings)); - } - - static void assertExpectedState(ReadOnlyKeyValueStore store) - { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); - } - - private static User userOf(TestUser user) - { - return User.of(user.getUser()); - } - - static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) - { - assertThat(countMessagesForUser(PETER, receivedMessages)); - assertThat(countMessagesForUser(KLAUS, receivedMessages)); - } - - private static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) - { - return messagesForUsers.get(user) == null - ? 0 - : messagesForUsers.get(user).size(); - } - - - static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) - { - assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); - assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); - } - - private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) - { - TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); - assertRankingEqualsRankingFromLastMessage(user, testRanking); - } - - private static TestEntry[] testEntriesOf(Entry... entries) - { - return Arrays - .stream(entries) - .map(entry -> TestEntry.of( - entry.getWord(), - entry.getCounter() == null - ? -1l - : entry.getCounter())) - .toArray(size -> new TestEntry[size]); - } - - private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) - { - assertThat(ranking).isEqualTo(getLastMessageFor(user)); - } - - private static TestRanking getLastMessageFor(TestUser user) - { - return getLastMessageFor(user, expectedMessages()); - } - - private static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) - { - return messagesForUsers - .get(user) - .stream() - .reduce(null, (left, right) -> right); - } - - private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] - { - KeyValue.pair( // 0 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 1 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 1l))), - KeyValue.pair( // 2 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l), - TestEntry.of("Welt", 1l))), - KeyValue.pair( // 3 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l))), - KeyValue.pair( // 4 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l), - TestEntry.of("s", 1l))), - KeyValue.pair( // 5 - PETER, - TestRanking.of( - TestEntry.of("Hallo", 1l), - TestEntry.of("Welt", 1l), - TestEntry.of("Boäh", 1l))), - KeyValue.pair( // 6 - PETER, - TestRanking.of( - TestEntry.of("Welt", 2l), - TestEntry.of("Hallo", 1l), - TestEntry.of("Boäh", 1l))), - KeyValue.pair( // 7 - PETER, - TestRanking.of( - TestEntry.of("Welt", 2l), - TestEntry.of("Boäh", 2l), - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 8 - KLAUS, - TestRanking.of( - TestEntry.of("Müsch", 2l), - TestEntry.of("s", 2l))), - KeyValue.pair( // 9 - PETER, - TestRanking.of( - TestEntry.of("Boäh", 3l), - TestEntry.of("Welt", 2l), - TestEntry.of("Hallo", 1l))), - KeyValue.pair( // 10 - KLAUS, - TestRanking.of( - TestEntry.of("s", 3l), - TestEntry.of("Müsch", 2l))), - }; - - private static MultiValueMap expectedMessages() - { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); - Stream - .of(EXPECTED_MESSAGES) - .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); - return expectedMessages; - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java deleted file mode 100644 index f5ef236..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ /dev/null @@ -1,168 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import de.juplo.kafka.wordcount.counter.TestCounter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; - -import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; -import static org.awaitility.Awaitility.await; - - -@SpringBootTest( - properties = { - "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "logging.level.org.apache.kafka.clients=INFO", - "logging.level.org.apache.kafka.streams=INFO", - "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.top10.commit-interval=100", - "juplo.wordcount.top10.cacheMaxBytes=0", - "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, - "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) -@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) -@Slf4j -public class Top10ApplicationIT -{ - public static final String TOPIC_IN = "in"; - public static final String TOPIC_OUT = "out"; - - @Autowired - Consumer consumer; - @Autowired - Top10StreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) - { - TestData - .getInputMessages() - .forEach(kv -> - { - try - { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - }); - } - - @DisplayName("Await the expected state in the state-store") - @Test - public void testAwaitExpectedState() - { - await("Expected state") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); - } - - @DisplayName("Await the expected output messages") - @Test - @Disabled - public void testAwaitExpectedMessages() - { - await("Expected messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); - } - - @DisplayName("Await the expected number of messages") - @Test - public void testAwaitExpectedNumberOfMessagesForUsers() - { - await("Expected number of messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages))); - } - - @DisplayName("Await the expected final output messages") - @Test - public void testAwaitExpectedLastMessagesForUsers() - { - await("Expected final output messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages))); - } - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, - @Payload TestRanking ranking) - { - log.debug("Received message: {} -> {}", user, ranking); - received.add(user, ranking); - } - - synchronized void enforceAssertion( - java.util.function.Consumer> assertion) - { - assertion.accept(received); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - - @Primary - @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() - { - return Stores.inMemoryKeyValueStore(STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java deleted file mode 100644 index 90d8e4c..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ /dev/null @@ -1,105 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import de.juplo.kafka.wordcount.counter.TestCounter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.Map; - -import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; - - -@Slf4j -public class Top10StreamProcessorTopologyTest -{ - public static final String IN = "TEST-IN"; - public static final String OUT = "TEST-OUT"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; - - - TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; - - - @BeforeEach - public void setUp() - { - Topology topology = Top10StreamProcessor.buildTopology( - IN, - OUT, - Stores.inMemoryKeyValueStore(STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - in = testDriver.createInputTopic( - IN, - jsonSerializer(TestWord.class, true), - jsonSerializer(TestCounter.class,false)); - - out = testDriver.createOutputTopic( - OUT, - new JsonDeserializer() - .copyWithType(TestUser.class) - .ignoreTypeHeaders(), - new JsonDeserializer() - .copyWithType(TestRanking.class) - .ignoreTypeHeaders()); - - } - - - @Test - public void test() - { - TestData - .getInputMessages() - .forEach(kv -> in.pipeInput(kv.key, kv.value)); - - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); - out - .readRecordsToList() - .forEach(record -> receivedMessages.add(record.key(), record.value())); - - TestData.assertExpectedMessages(receivedMessages); - - TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages); - TestData.assertExpectedLastMessagesForUsers(receivedMessages); - - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); - } - - @AfterEach - public void tearDown() - { - testDriver.close(); - } - - private JsonSerializer jsonSerializer(Class type, boolean isKey) - { - JsonSerializer jsonSerializer = new JsonSerializer<>(); - jsonSerializer.configure( - Map.of( - JsonSerializer.TYPE_MAPPINGS, - "word:" + TestWord.class.getName() + "," + - "counter:" + TestCounter.class.getName()), - isKey); - return jsonSerializer; - } -}