From: Kai Moritz Date: Thu, 30 May 2024 09:01:27 +0000 (+0200) Subject: TEST-STORE:GREEN X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Ftop10;hp=4b94d31fbd663cb277276def106be9873ec4a246;p=demos%2Fkafka%2Fwordcount TEST-STORE:GREEN --- diff --git a/Dockerfile b/Dockerfile index bbd15ef..16a12e3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:17-jre COPY target/*.jar /opt/app.jar -EXPOSE 8080 +EXPOSE 8084 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] CMD [] diff --git a/pom.xml b/pom.xml index 9bda638..fd71ccd 100644 --- a/pom.xml +++ b/pom.xml @@ -5,18 +5,16 @@ org.springframework.boot spring-boot-starter-parent - 2.5.4 + 3.2.5 de.juplo.kafka.wordcount top10 - 1.0.0 + 1.2.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example 0.33.0 - 11 - 2.8.0 @@ -31,6 +29,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot @@ -48,15 +50,34 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index 67f45f2..b25fc07 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -1,11 +1,20 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Value; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; -@Value(staticConstructor = "of") +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private final String word; - private final Long count; + private String word; + private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index d09dbcc..ffac8ea 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -1,13 +1,17 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; -@Getter -@Setter +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String username; + private String user; private String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index b748fe5..4f56c18 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -1,36 +1,50 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import lombok.*; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; +import java.util.*; -@Getter -@Setter +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data public class Ranking { + public final static int MAX_ENTRIES = 10; + + private Entry[] entries = new Entry[0]; - public void add(Entry newEntry) + public Ranking add(Entry newEntry) { if (entries.length == 0) { entries = new Entry[1]; entries[0] = newEntry; - return; + return this; } List list = new LinkedList<>(Arrays.asList(entries)); + int oldPosition = -1; for (int i = 0; i < list.size(); i++) { - Entry entry; + Entry entry = list.get(i); - entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) + if (entry.getCounter() < newEntry.getCounter()) { + if (oldPosition > -1) + { + if (list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + else + { + // Entry for word already exists with the same counting! Nothing changed... + return this; + } + } + list.add(i, newEntry); for (int j = i+1; j < list.size(); j++) { @@ -41,13 +55,105 @@ public class Ranking break; } } - if (list.size() > 10) + if (list.size() > MAX_ENTRIES) { - list = list.subList(0,10); + list = list.subList(0, MAX_ENTRIES); } entries = list.toArray(num -> new Entry[num]); - return; + return this; + } + + if (entry.getWord().equals(newEntry.getWord())) + oldPosition = i; + } + + if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + + if (list.size() < MAX_ENTRIES) + { + list.add(newEntry); + entries = list.toArray(num -> new Entry[num]); + } + + return this; + } + + public Ranking validate() throws IllegalArgumentException + { + if (this.entries.length > MAX_ENTRIES) + throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES ); + + Set seenWords = new HashSet<>(); + long lowesCounting = Long.MAX_VALUE; + + for (int i=0; i " + entry.getWord()); + if (entry.getCounter() > lowesCounting) + throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); + + seenWords.add(entry.getWord()); + lowesCounting = entry.getCounter(); + } + + return this; + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null) + return false; + if (!(o instanceof Ranking)) + return false; + + Ranking other = (Ranking)o; + + if (other.entries.length != entries.length) + return false; + + if (entries.length == 0) + return true; + + int i = 0; + Set myWordsWithCurrentCount = new HashSet<>(); + Set otherWordsWithCurrentCount = new HashSet<>(); + Entry myEntry = entries[i]; + long currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.add(myEntry.getWord()); + while (true) + { + Entry otherEntry = other.entries[i]; + if (otherEntry.getCounter() != currentCount) + return false; + otherWordsWithCurrentCount.add(otherEntry.getWord()); + if (++i >= entries.length) + return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); + myEntry = entries[i]; + if (myEntry.getCounter() != currentCount) + { + if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount)) + return false; + currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.clear(); + otherWordsWithCurrentCount.clear(); } + myWordsWithCurrentCount.add(myEntry.getWord()); } } + + public static Ranking of(Entry... entries) + { + Ranking ranking = new Ranking(entries); + ranking.validate(); + return ranking; + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java index 27dca95..5c14ae7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java @@ -2,11 +2,9 @@ package de.juplo.kafka.wordcount.top10; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@EnableConfigurationProperties(Top10ApplicationProperties.class) public class Top10Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java new file mode 100644 index 0000000..03497e4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -0,0 +1,88 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(Top10ApplicationProperties.class) +@Slf4j +public class Top10ApplicationConfiguration +{ + @Bean + public Properties streamProcessorProperties(Top10ApplicationProperties properties) + { + Properties props = new Properties(); + + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "word:" + Key.class.getName() + "," + + "counter:" + Entry.class.getName() + "," + + "user:" + User.class.getName() + "," + + "ranking:" + Ranking.class.getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (properties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public Top10StreamProcessor streamProcessor( + Top10ApplicationProperties applicationProperties, + Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, + ConfigurableApplicationContext context) + { + Top10StreamProcessor streamProcessor = new Top10StreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + streamProcessorProperties, + storeSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("top10"); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java index 93b78ec..d3bb236 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java @@ -17,4 +17,6 @@ public class Top10ApplicationProperties private String applicationId = "top10"; private String inputTopic = "countings"; private String outputTopic = "top10"; + private Integer commitInterval; + private Integer cacheMaxBytes; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 862913a..343ab4d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,108 +1,70 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Slf4j -@Component public class Top10StreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); - public final KafkaStreams streams; public Top10StreamProcessor( - Top10ApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) + String inputTopic, + String outputTopic, + Properties props, + KeyValueBytesStoreSupplier storeSupplier) + { + Topology topology = Top10StreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier); + + streams = new KafkaStreams(topology, props); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(properties.getInputTopic()) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + .stream(inputTopic) + .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) .groupByKey() .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) + () -> new Ranking(), + (user, entry, ranking) -> ranking.add(entry), + Materialized.as(storeSupplier)) .toStream() - .to(properties.getOutputTopic()); + .to(outputTopic); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + return topology; + } - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); + ReadOnlyKeyValueStore getStore(String name) + { + return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); } - @PostConstruct public void start() { log.info("Starting Stream-Processor"); streams.start(); } - @PreDestroy public void stop() { log.info("Stopping Stream-Processor"); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java new file mode 100644 index 0000000..53c258d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/User.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class User +{ + String user; +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 8b13789..c78fdb0 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1 +1,2 @@ - +server.port=8084 +management.endpoints.web.exposure.include=* diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java new file mode 100644 index 0000000..d98ae64 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestCounter +{ + String user; + String word; + long counter; + + public static TestCounter of(TestWord word, long counter) + { + return new TestCounter(word.getUser(), word.getWord(), counter); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java new file mode 100644 index 0000000..8008e12 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class TestWord +{ + private String user; + private String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java b/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java new file mode 100644 index 0000000..1bbd3ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.Entry; +import lombok.Data; + + +@Data +public class RankingData +{ + private Entry[] entries; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java new file mode 100644 index 0000000..26749e9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java @@ -0,0 +1,276 @@ +package de.juplo.kafka.wordcount.top10; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; + + +public class RankingTest +{ + @DisplayName("A newly created instance is empty") + @Test + public void testNewRankingIsEmpty() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from an empty ranking is empty") + @Test + public void testRankingOfYieldsExpectedResultForEmptyList() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from a valid ranking contains the expected entries") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testRankingOfYieldsExpectedResultsForValidRankings(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList); + } + + @DisplayName("The builder fails for invalid rankings") + @ParameterizedTest + @MethodSource("invalidRankingsProvider") + public void testRankingOfThrowsExceptionForInvalidRankings(List entryList) + { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> Ranking.of(toArray(entryList))); + } + + @DisplayName("Adding a new word with highest ranking, pushes all other words down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithHighestRanking(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(-1)); + ranking.add(newEntry); + assertThat(ranking.getEntries()[0]).isEqualTo(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + + @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithExistingRanking(List entryList) + { + for (int position = 0; position < entryList.size(); position++ ) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(position)); + ranking.add(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + if (i < position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + } + if (i == position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry); + } + if (i > position) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + } + } + + @DisplayName("Adding a highest ranking for an existing word shifts it to the first place") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithHighestRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + String word = wordForPosition(position); + Entry highestEntry = Entry.of(word, 100l); + ranking.add(highestEntry); + List expectedEntries = Stream + .concat( + Stream.of(highestEntry), + VALID_RANKINGS[0] + .stream() + .filter(entry -> !entry.getWord().equals(word))) + .toList(); + assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); + } + + @DisplayName("Adding an existing word with unchanged ranking changes nothing") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithUnchangedRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + Entry unchangedEntry = Entry.of( + wordForPosition(position), + rankingForPosition(position)); + ranking.add(unchangedEntry); + assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]); + } + + @DisplayName("Adding an existing word with a lower ranking fails") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingExistingWordWithLowerRankingFails(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + entryList.forEach(entry -> + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); + } + + @DisplayName("Identical rankings are considered equal") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testIdenticalRankingsAreConsideredEaqual(List entryList) + { + assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Two empty rankings are considered equal") + @Test + public void testTwoEmptyRankingsAreConsideredEaqual() + { + assertThat(Ranking.of()).isEqualTo(Ranking.of()); + } + + @DisplayName("A changed ranking is not considered equal to its unchanged counter-part") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testChangedRankingsDoNotEaqualUnchangedOne(List entryList) + { + Ranking changed = Ranking.of(toArray(entryList)); + changed.add(Entry.of("devilish", 666l)); + assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ") + @Test + public void testRankingWithDifferentOrderForSameCountAreEqual() + { + assertThat( + Ranking.of( + Entry.of("a1",10l), + Entry.of("a2",10l), + Entry.of("b1", 9l), + Entry.of("b2",9l), + Entry.of("c1", 8l), + Entry.of("c2", 8l))) + .isEqualTo(Ranking.of( + Entry.of("a2",10l), + Entry.of("a1",10l), + Entry.of("b2", 9l), + Entry.of("b1",9l), + Entry.of("c2", 8l), + Entry.of("c1", 8l))); + } + + + Entry[] toArray(List entryList) + { + return entryList.toArray(size -> new Entry[size]); + } + + static String wordForPosition(int position) + { + return Integer.toString(position+1); + } + + static long rankingForPosition(int position) + { + return (long)Ranking.MAX_ENTRIES * 2 - position; + } + + static Stream> validRankingsProvider() + { + return Stream.of(VALID_RANKINGS); + } + + static Stream> invalidRankingsProvider() + { + return Stream.of(INVALID_RANKINGS); + } + + static String[] WORDS = new String[Ranking.MAX_ENTRIES]; + static List[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES]; + + static + { + for (int i = 0; i < Ranking.MAX_ENTRIES; i++) + { + List ranking = new LinkedList<>(); + String word = null; + for (int position = 0; position <= i; position++) + { + word = wordForPosition(position); + Entry entry = Entry.of(word, rankingForPosition(position)); + ranking.add(entry); + } + WORDS[i] = word; + VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking; + } + } + + static List[] INVALID_RANKINGS = new List[] { + List.of( + Entry.of("Platz eins", 1l), + Entry.of("Platz zwei", 2l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 11l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 11111111l), + Entry.of("Platz zwei", 2222222l), + Entry.of("Platz fünf", 555555l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz eins", 111111l), + Entry.of("Platz sechs", 66666l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz sieben", 7777l), + Entry.of("Platz acht", 888l), + Entry.of("Platz neun", 99l), + Entry.of("Platz 10", 6l), + Entry.of("Platz 11", 3l))}; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java new file mode 100644 index 0000000..f6d7ccd --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -0,0 +1,166 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + TestWord.of("peter","Hallo"), + TestCounter.of("peter","Hallo",1)), + new KeyValue<>( + TestWord.of("klaus","Müsch"), + TestCounter.of("klaus","Müsch",1)), + new KeyValue<>( + TestWord.of("peter","Welt"), + TestCounter.of("peter","Welt",1)), + new KeyValue<>( + TestWord.of("klaus","Müsch"), + TestCounter.of("klaus","Müsch",2)), + new KeyValue<>( + TestWord.of("klaus","s"), + TestCounter.of("klaus","s",1)), + new KeyValue<>( + TestWord.of("peter","Boäh"), + TestCounter.of("peter","Boäh",1)), + new KeyValue<>( + TestWord.of("peter","Welt"), + TestCounter.of("peter","Welt",2)), + new KeyValue<>( + TestWord.of("peter","Boäh"), + TestCounter.of("peter","Boäh",2)), + new KeyValue<>( + TestWord.of("klaus","s"), + TestCounter.of("klaus","s",2)), + new KeyValue<>( + TestWord.of("peter","Boäh"), + TestCounter.of("peter","Boäh",3)), + new KeyValue<>( + TestWord.of("klaus","s"), + TestCounter.of("klaus","s",3)), + }; + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + expectedMessages().forEach( + (user, rankings) -> + assertThat(receivedMessages.get(user)) + .containsExactlyElementsOf(rankings)); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value); + assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value); + } + + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + User.of("peter"), + Ranking.of( + Entry.of("Hallo", 1l))), + KeyValue.pair( // 1 + User.of("klaus"), + Ranking.of( + Entry.of("Müsch", 1l))), + KeyValue.pair( // 2 + User.of("peter"), + Ranking.of( + Entry.of("Hallo", 1l), + Entry.of("Welt", 1l))), + KeyValue.pair( // 3 + User.of("klaus"), + Ranking.of( + Entry.of("Müsch", 2l))), + KeyValue.pair( // 4 + User.of("klaus"), + Ranking.of( + Entry.of("Müsch", 2l), + Entry.of("s", 1l))), + KeyValue.pair( // 5 + User.of("peter"), + Ranking.of( + Entry.of("Hallo", 1l), + Entry.of("Welt", 1l), + Entry.of("Boäh", 1l))), + KeyValue.pair( // 6 + User.of("peter"), + Ranking.of( + Entry.of("Welt", 2l), + Entry.of("Hallo", 1l), + Entry.of("Boäh", 1l))), + KeyValue.pair( // 7 + User.of("peter"), + Ranking.of( + Entry.of("Welt", 2l), + Entry.of("Boäh", 2l), + Entry.of("Hallo", 1l))), + KeyValue.pair( // 8 + User.of("klaus"), + Ranking.of( + Entry.of("Müsch", 2l), + Entry.of("s", 2l))), + KeyValue.pair( // 9 + User.of("peter"), + Ranking.of( + Entry.of("Boäh", 3l), + Entry.of("Welt", 2l), + Entry.of("Hallo", 1l))), + KeyValue.pair( // 10 + User.of("klaus"), + Ranking.of( + Entry.of("s", 3l), + Entry.of("Müsch", 2l))), + }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } + + static Map convertToMap(Properties properties) + { + return properties + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> (String)entry.getKey(), + entry -> entry.getValue() + )); + } + + static String parseHeader(Headers headers, String key) + { + Header header = headers.lastHeader(key); + if (header == null) + { + return key + "=null"; + } + else + { + return key + "=" + new String(header.value()); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java new file mode 100644 index 0000000..6707acc --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -0,0 +1,145 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.query.RankingData; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.use.type.headers=false", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.RankingData", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.top10.commit-interval=0", + "juplo.wordcount.top10.cacheMaxBytes=0", + "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, + "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) +@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) +@Slf4j +public class Top10ApplicationIT +{ + public final static String TOPIC_IN = "in"; + public final static String TOPIC_OUT = "out"; + public final static String STORE_NAME = "TEST-STORE"; + + + @Autowired + Consumer consumer; + @Autowired + Top10StreamProcessor top10StreamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expexted state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(top10StreamProcessor.getStore(STORE_NAME))); + } + + @DisplayName("Await the expected output messages") + @Test + @Disabled + public void testAwaitExpectedMessages() + { + await("Expexted messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) User user, + @Payload RankingData ranking) + { + log.debug("Received message: {} -> {}", user, ranking); + received.add(user, Ranking.of(ranking.getEntries())); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java new file mode 100644 index 0000000..3feeea3 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -0,0 +1,110 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; +import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; + + +@Slf4j +public class Top10StreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + public final static String STORE_NAME = "TOPOLOGY-TEST"; + + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUp() + { + Topology topology = Top10StreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); + + Top10ApplicationConfiguration applicationConfiguriation = + new Top10ApplicationConfiguration(); + Properties streamProcessorProperties = + applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); + Map propertyMap = convertToMap(streamProcessorProperties); + + JsonSerde keySerde = new JsonSerde<>(); + keySerde.configure(propertyMap, true); + JsonSerde valueSerde = new JsonSerde<>(); + valueSerde.configure(propertyMap, false); + + testDriver = new TopologyTestDriver(topology, streamProcessorProperties); + + in = testDriver.createInputTopic( + IN, + (JsonSerializer)keySerde.serializer(), + (JsonSerializer)valueSerde.serializer()); + + out = testDriver.createOutputTopic( + OUT, + (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)valueSerde.deserializer()); + + } + + + @Test + public void test() + { + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(kv -> in.pipeInput( + Key.of(kv.key.getUser(), kv.key.getWord()), + Entry.of(kv.value.getWord(), kv.value.getCounter()))); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> + { + log.debug( + "OUT: {} -> {}, {}, {}", + record.key(), + record.value(), + parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME), + parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME)); + receivedMessages.add(record.key(), record.value()); + }); + + TestData.assertExpectedMessages(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } +}