From: Kai Moritz Date: Sun, 23 Jun 2024 09:51:44 +0000 (+0200) Subject: top10: 1.4.2 - RocksDB does nor work in Alpine-Linux X-Git-Tag: top10-1.4.2 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Ftop10;hp=4b94d31fbd663cb277276def106be9873ec4a246;p=demos%2Fkafka%2Fwordcount top10: 1.4.2 - RocksDB does nor work in Alpine-Linux --- diff --git a/Dockerfile b/Dockerfile index bbd15ef..ae0723d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:21-jre COPY target/*.jar /opt/app.jar -EXPOSE 8080 +EXPOSE 8084 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] CMD [] diff --git a/pom.xml b/pom.xml index 9bda638..cb79013 100644 --- a/pom.xml +++ b/pom.xml @@ -5,18 +5,17 @@ org.springframework.boot spring-boot-starter-parent - 2.5.4 + 3.2.7 de.juplo.kafka.wordcount top10 - 1.0.0 + 1.4.2 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example - 0.33.0 - 11 - 2.8.0 + 21 + 0.44.0 @@ -31,6 +30,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot @@ -48,15 +51,34 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index 67f45f2..7d00500 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -1,11 +1,20 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Value; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; -@Value(staticConstructor = "of") +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private final String word; - private final Long count; + private String key; + private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index d09dbcc..aaf016c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -1,13 +1,18 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; -@Getter -@Setter +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String username; - private String word; + private StatsType type; + private String channel; + private String key; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index b748fe5..279716a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -1,53 +1,159 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import lombok.*; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; +import java.util.*; -@Getter -@Setter +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data public class Ranking { + public final static int MAX_ENTRIES = 10; + + private Entry[] entries = new Entry[0]; - public void add(Entry newEntry) + public Ranking add(Entry newEntry) { if (entries.length == 0) { entries = new Entry[1]; entries[0] = newEntry; - return; + return this; } List list = new LinkedList<>(Arrays.asList(entries)); + int oldPosition = -1; for (int i = 0; i < list.size(); i++) { - Entry entry; + Entry entry = list.get(i); - entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) + if (entry.getCounter() < newEntry.getCounter()) { + if (oldPosition > -1) + { + if (list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + else + { + // Entry for word already exists with the same counting! Nothing changed... + return this; + } + } + list.add(i, newEntry); for (int j = i+1; j < list.size(); j++) { entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) + if(entry.getKey().equals(newEntry.getKey())) { list.remove(j); break; } } - if (list.size() > 10) + if (list.size() > MAX_ENTRIES) { - list = list.subList(0,10); + list = list.subList(0, MAX_ENTRIES); } entries = list.toArray(num -> new Entry[num]); - return; + return this; + } + + if (entry.getKey().equals(newEntry.getKey())) + oldPosition = i; + } + + if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + + if (list.size() < MAX_ENTRIES) + { + list.add(newEntry); + entries = list.toArray(num -> new Entry[num]); + } + + return this; + } + + public Ranking validate() throws IllegalArgumentException + { + if (this.entries.length > MAX_ENTRIES) + throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES ); + + Set seenWords = new HashSet<>(); + long lowesCounting = Long.MAX_VALUE; + + for (int i=0; i " + entry.getKey()); + if (entry.getCounter() > lowesCounting) + throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); + + seenWords.add(entry.getKey()); + lowesCounting = entry.getCounter(); + } + + return this; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null) + return false; + if (!(o instanceof Ranking)) + return false; + + Ranking other = (Ranking)o; + + if (other.entries.length != entries.length) + return false; + + if (entries.length == 0) + return true; + + int i = 0; + Set myWordsWithCurrentCount = new HashSet<>(); + Set otherWordsWithCurrentCount = new HashSet<>(); + Entry myEntry = entries[i]; + long currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.add(myEntry.getKey()); + while (true) + { + Entry otherEntry = other.entries[i]; + if (otherEntry.getCounter() != currentCount) + return false; + otherWordsWithCurrentCount.add(otherEntry.getKey()); + if (++i >= entries.length) + return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); + myEntry = entries[i]; + if (myEntry.getCounter() != currentCount) + { + if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount)) + return false; + currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.clear(); + otherWordsWithCurrentCount.clear(); } + myWordsWithCurrentCount.add(myEntry.getKey()); } } + + public static Ranking of(Entry... entries) + { + Ranking ranking = new Ranking(entries); + ranking.validate(); + return ranking; + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java new file mode 100644 index 0000000..05c2a91 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class Stats +{ + StatsType type; + String channel; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java new file mode 100644 index 0000000..b1b8f9b --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java @@ -0,0 +1,7 @@ +package de.juplo.kafka.wordcount.top10; + +enum StatsType +{ + COUNTER, + POPULAR +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java index 27dca95..5c14ae7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java @@ -2,11 +2,9 @@ package de.juplo.kafka.wordcount.top10; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@EnableConfigurationProperties(Top10ApplicationProperties.class) public class Top10Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java new file mode 100644 index 0000000..aecd260 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -0,0 +1,98 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(Top10ApplicationProperties.class) +@Slf4j +public class Top10ApplicationConfiguration +{ + @Bean + public Properties streamProcessorProperties(Top10ApplicationProperties properties) + { + Properties props = new Properties(); + + props.putAll(serializationConfig()); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + + if (properties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + static Properties serializationConfig() + { + Properties props = new Properties(); + + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Stats.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "key:" + Key.class.getName() + "," + + "counter:" + Entry.class.getName() + "," + + "stats:" + Stats.class.getName() + "," + + "ranking:" + Ranking.class.getName()); + + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public Top10StreamProcessor streamProcessor( + Top10ApplicationProperties applicationProperties, + Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, + ConfigurableApplicationContext context) + { + Top10StreamProcessor streamProcessor = new Top10StreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + streamProcessorProperties, + storeSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore(STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java index 93b78ec..d3bb236 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java @@ -17,4 +17,6 @@ public class Top10ApplicationProperties private String applicationId = "top10"; private String inputTopic = "countings"; private String outputTopic = "top10"; + private Integer commitInterval; + private Integer cacheMaxBytes; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 862913a..1235132 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,108 +1,74 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Slf4j -@Component public class Top10StreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); + public static final String STORE_NAME= "top10"; public final KafkaStreams streams; public Top10StreamProcessor( - Top10ApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) + String inputTopic, + String outputTopic, + Properties props, + KeyValueBytesStoreSupplier storeSupplier) + { + Topology topology = Top10StreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier); + + streams = new KafkaStreams(topology, props); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(properties.getInputTopic()) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + .stream(inputTopic) + .map((key, entry) -> new KeyValue<>( + Stats.of(key.getType(), key.getChannel()), + entry)) .groupByKey() .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) + () -> new Ranking(), + (stats, entry, ranking) -> ranking.add(entry), + Materialized.as(storeSupplier)) .toStream() - .to(properties.getOutputTopic()); + .to(outputTopic); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return topology; + } - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } - @PostConstruct public void start() { log.info("Starting Stream-Processor"); streams.start(); } - @PreDestroy public void stop() { log.info("Stopping Stream-Processor"); diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8b13789..c78fdb0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ - +server.port=8084 +management.endpoints.web.exposure.include=* diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java new file mode 100644 index 0000000..b78c429 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestCounter +{ + String key; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java new file mode 100644 index 0000000..00c1af7 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class TestWord +{ + private String type; + private String channel; + private String key; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java new file mode 100644 index 0000000..8019da9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String key; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java new file mode 100644 index 0000000..efad48b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.Entry; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data +public class TestRanking +{ + private TestEntry[] entries; + + public static TestRanking of(TestEntry... entries) + { + return new TestRanking(entries); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java new file mode 100644 index 0000000..6a47193 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestStats +{ + String type; + String channel; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java new file mode 100644 index 0000000..0f36860 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java @@ -0,0 +1,276 @@ +package de.juplo.kafka.wordcount.top10; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; + + +public class RankingTest +{ + @DisplayName("A newly created instance is empty") + @Test + public void testNewRankingIsEmpty() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from an empty ranking is empty") + @Test + public void testRankingOfYieldsExpectedResultForEmptyList() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from a valid ranking contains the expected entries") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testRankingOfYieldsExpectedResultsForValidRankings(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList); + } + + @DisplayName("The builder fails for invalid rankings") + @ParameterizedTest + @MethodSource("invalidRankingsProvider") + public void testRankingOfThrowsExceptionForInvalidRankings(List entryList) + { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> Ranking.of(toArray(entryList))); + } + + @DisplayName("Adding a new word with highest ranking, pushes all other words down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithHighestRanking(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(-1)); + ranking.add(newEntry); + assertThat(ranking.getEntries()[0]).isEqualTo(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + + @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithExistingRanking(List entryList) + { + for (int position = 0; position < entryList.size(); position++ ) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(position)); + ranking.add(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + if (i < position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + } + if (i == position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry); + } + if (i > position) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + } + } + + @DisplayName("Adding a highest ranking for an existing word shifts it to the first place") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithHighestRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + String word = wordForPosition(position); + Entry highestEntry = Entry.of(word, 100l); + ranking.add(highestEntry); + List expectedEntries = Stream + .concat( + Stream.of(highestEntry), + VALID_RANKINGS[0] + .stream() + .filter(entry -> !entry.getKey().equals(word))) + .toList(); + assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); + } + + @DisplayName("Adding an existing word with unchanged ranking changes nothing") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithUnchangedRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + Entry unchangedEntry = Entry.of( + wordForPosition(position), + rankingForPosition(position)); + ranking.add(unchangedEntry); + assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]); + } + + @DisplayName("Adding an existing word with a lower ranking fails") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingExistingWordWithLowerRankingFails(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + entryList.forEach(entry -> + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1)))); + } + + @DisplayName("Identical rankings are considered equal") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testIdenticalRankingsAreConsideredEaqual(List entryList) + { + assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Two empty rankings are considered equal") + @Test + public void testTwoEmptyRankingsAreConsideredEaqual() + { + assertThat(Ranking.of()).isEqualTo(Ranking.of()); + } + + @DisplayName("A changed ranking is not considered equal to its unchanged counter-part") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testChangedRankingsDoNotEaqualUnchangedOne(List entryList) + { + Ranking changed = Ranking.of(toArray(entryList)); + changed.add(Entry.of("devilish", 666l)); + assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ") + @Test + public void testRankingWithDifferentOrderForSameCountAreEqual() + { + assertThat( + Ranking.of( + Entry.of("a1",10l), + Entry.of("a2",10l), + Entry.of("b1", 9l), + Entry.of("b2",9l), + Entry.of("c1", 8l), + Entry.of("c2", 8l))) + .isEqualTo(Ranking.of( + Entry.of("a2",10l), + Entry.of("a1",10l), + Entry.of("b2", 9l), + Entry.of("b1",9l), + Entry.of("c2", 8l), + Entry.of("c1", 8l))); + } + + + Entry[] toArray(List entryList) + { + return entryList.toArray(size -> new Entry[size]); + } + + static String wordForPosition(int position) + { + return Integer.toString(position+1); + } + + static long rankingForPosition(int position) + { + return (long)Ranking.MAX_ENTRIES * 2 - position; + } + + static Stream> validRankingsProvider() + { + return Stream.of(VALID_RANKINGS); + } + + static Stream> invalidRankingsProvider() + { + return Stream.of(INVALID_RANKINGS); + } + + static String[] WORDS = new String[Ranking.MAX_ENTRIES]; + static List[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES]; + + static + { + for (int i = 0; i < Ranking.MAX_ENTRIES; i++) + { + List ranking = new LinkedList<>(); + String word = null; + for (int position = 0; position <= i; position++) + { + word = wordForPosition(position); + Entry entry = Entry.of(word, rankingForPosition(position)); + ranking.add(entry); + } + WORDS[i] = word; + VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking; + } + } + + static List[] INVALID_RANKINGS = new List[] { + List.of( + Entry.of("Platz eins", 1l), + Entry.of("Platz zwei", 2l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 11l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 11111111l), + Entry.of("Platz zwei", 2222222l), + Entry.of("Platz fünf", 555555l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz eins", 111111l), + Entry.of("Platz sechs", 66666l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz sieben", 7777l), + Entry.of("Platz acht", 888l), + Entry.of("Platz neun", 99l), + Entry.of("Platz 10", 6l), + Entry.of("Platz 11", 3l))}; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java new file mode 100644 index 0000000..069f49a --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -0,0 +1,213 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestEntry; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestStats; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final String TYPE_COUNTER = "COUNTER"; + + static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter"); + static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus"); + + static final Stream> getInputMessages() + { + return Stream.of(INPUT_MESSAGES); + } + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"), + TestCounter.of("Hallo",1)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"), + TestCounter.of("Müsch",1)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"), + TestCounter.of("Welt",1)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"), + TestCounter.of("Müsch",2)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), + TestCounter.of("s",1)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), + TestCounter.of("Boäh",1)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"), + TestCounter.of("Welt",2)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), + TestCounter.of("Boäh",2)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), + TestCounter.of("s",2)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), + TestCounter.of("Boäh",3)), + new KeyValue<>( + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), + TestCounter.of("s",3)), + }; + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + expectedMessages().forEach( + (stats, rankings) -> + assertThat(receivedMessages.get(stats)) + .containsExactlyElementsOf(rankings)); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER))); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS))); + } + + private static Stats statsOf(TestStats stats) + { + return Stats.of( + StatsType.valueOf(stats.getType()), + stats.getChannel()); + } + + static void assertExpectedNumberOfMessages(MultiValueMap receivedMessages) + { + assertThat(countMessages(PETER, receivedMessages)); + assertThat(countMessages(KLAUS, receivedMessages)); + } + + private static int countMessages(TestStats stats, MultiValueMap messagesFor) + { + return messagesFor.get(stats) == null + ? 0 + : messagesFor.get(stats).size(); + } + + + static void assertExpectedLastMessages(MultiValueMap receivedMessages) + { + assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); + assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); + } + + private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking) + { + TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); + assertRankingEqualsRankingFromLastMessage(stats, testRanking); + } + + private static TestEntry[] testEntriesOf(Entry... entries) + { + return Arrays + .stream(entries) + .map(entry -> TestEntry.of( + entry.getKey(), + entry.getCounter() == null + ? -1l + : entry.getCounter())) + .toArray(size -> new TestEntry[size]); + } + + private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking) + { + assertThat(ranking).isEqualTo(getLastMessageFor(stats)); + } + + private static TestRanking getLastMessageFor(TestStats stats) + { + return getLastMessageFor(stats, expectedMessages()); + } + + private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap messagesFor) + { + return messagesFor + .get(stats) + .stream() + .reduce(null, (left, right) -> right); + } + + private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 1 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 2 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), + KeyValue.pair( // 3 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l))), + KeyValue.pair( // 4 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), + KeyValue.pair( // 5 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 6 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 7 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 8 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), + KeyValue.pair( // 9 + PETER, + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 10 + KLAUS, + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), + }; + + private static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java new file mode 100644 index 0000000..51e424e --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -0,0 +1,168 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestStats; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; + +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.type.mapping=stats:de.juplo.kafka.wordcount.query.TestStats,ranking:de.juplo.kafka.wordcount.query.TestRanking", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.top10.commit-interval=100", + "juplo.wordcount.top10.cacheMaxBytes=0", + "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, + "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) +@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) +@Slf4j +public class Top10ApplicationIT +{ + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; + + @Autowired + Consumer consumer; + @Autowired + Top10StreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + + @DisplayName("Await the expected output messages") + @Test + @Disabled + public void testAwaitExpectedMessages() + { + await("Expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessages() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessages(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessages() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessages(receivedMessages))); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestStats user, + @Payload TestRanking ranking) + { + log.debug("Received message: {} -> {}", user, ranking); + received.add(user, ranking); + } + + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) + { + assertion.accept(received); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java new file mode 100644 index 0000000..559d742 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -0,0 +1,119 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestStats; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; + +import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; + + +@Slf4j +public class Top10StreamProcessorTopologyTest +{ + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + + static TopologyTestDriver testDriver; + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + + + @BeforeAll + public static void setUp() + { + Topology topology = Top10StreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + TestInputTopic in = testDriver.createInputTopic( + IN, + jsonSerializer(TestWord.class, true), + jsonSerializer(TestCounter.class,false)); + + TestOutputTopic out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer() + .copyWithType(TestStats.class) + .ignoreTypeHeaders(), + new JsonDeserializer() + .copyWithType(TestRanking.class) + .ignoreTypeHeaders()); + + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + } + + + @DisplayName("Assert the expected output messages") + @Test + public void testExpectedMessages() + { + TestData.assertExpectedMessages(receivedMessages); + } + + @DisplayName("Assert the expected number of messages") + @Test + public void testExpectedNumberOfMessages() + { + TestData.assertExpectedNumberOfMessages(receivedMessages); + } + + @DisplayName("Assert the expected final output messages") + @Test + public void testExpectedLastMessages() + { + TestData.assertExpectedLastMessages(receivedMessages); + } + + @DisplayName("Assert the expected state in the state-store") + @Test + public void testExpectedState() + { + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterAll + public static void tearDown() + { + testDriver.close(); + } + + private static JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "key:" + TestWord.class.getName() + "," + + "counter:" + TestCounter.class.getName()), + isKey); + return jsonSerializer; + } +}