From: Kai Moritz Date: Sat, 25 May 2024 14:59:42 +0000 (+0200) Subject: top10: 1.1.2 - (RED) Added IT for the expectated processing X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=49d5464daa86555fe62d5ac7e10c390134eccb4f;p=demos%2Fkafka%2Fwordcount top10: 1.1.2 - (RED) Added IT for the expectated processing --- diff --git a/pom.xml b/pom.xml index 6749738..86b4290 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.1.1 + 1.1.2 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example @@ -50,15 +50,34 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java b/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java new file mode 100644 index 0000000..8364931 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.top10; + + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +public class Counter +{ + String user; + String word; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java new file mode 100644 index 0000000..83a8a34 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -0,0 +1,157 @@ +package de.juplo.kafka.wordcount.top10; + +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.KeyValue; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.BiConsumer; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static void writeInputData(BiConsumer consumer) + { + consumer.accept( + Key.of("peter","Hallo"), + Counter.of("peter","Hallo",1)); + consumer.accept( + Key.of("klaus","Müsch"), + Counter.of("klaus","Müsch",1)); + consumer.accept( + Key.of("peter","Welt"), + Counter.of("peter","Welt",1)); + consumer.accept( + Key.of("klaus","Müsch"), + Counter.of("klaus","Müsch",2)); + consumer.accept( + Key.of("klaus","s"), + Counter.of("klaus","s",1)); + consumer.accept( + Key.of("peter","Boäh"), + Counter.of("peter","Boäh",1)); + consumer.accept( + Key.of("peter","Welt"), + Counter.of("peter","Welt",2)); + consumer.accept( + Key.of("peter","Boäh"), + Counter.of("peter","Boäh",2)); + consumer.accept( + Key.of("klaus","s"), + Counter.of("klaus","s",2)); + consumer.accept( + Key.of("peter","Boäh"), + Counter.of("peter","Boäh",3)); + consumer.accept( + Key.of("klaus","s"), + Counter.of("klaus","s",3)); + } + + static void assertExpectedResult(List> receivedMessages) + { + assertThat(receivedMessages).hasSize(11); + assertThat(receivedMessages).containsSubsequence( + expectedMessages[0], + expectedMessages[2], + expectedMessages[5], + expectedMessages[6], + expectedMessages[7]); // peter + + assertThat(receivedMessages).containsSubsequence( + expectedMessages[1], + expectedMessages[3], + expectedMessages[4], + expectedMessages[8], + expectedMessages[10]); // klaus + } + + static KeyValue[] expectedMessages = new KeyValue[] + { + KeyValue.pair( // 0 + "peter", + Ranking.of( + Entry.of("Hallo", 1l))), + KeyValue.pair( // 1 + "klaus", + Ranking.of( + Entry.of("Müsch", 1l))), + KeyValue.pair( // 2 + "peter", + Ranking.of( + Entry.of("Hallo", 1l), + Entry.of("Welt", 1l))), + KeyValue.pair( // 3 + "klaus", + Ranking.of( + Entry.of("Müsch", 2l))), + KeyValue.pair( // 4 + "klaus", + Ranking.of( + Entry.of("Müsch", 2l), + Entry.of("s", 1l))), + KeyValue.pair( // 5 + "peter", + Ranking.of( + Entry.of("Hallo", 1l), + Entry.of("Welt", 1l), + Entry.of("Boäh", 1l))), + KeyValue.pair( // 6 + "peter", + Ranking.of( + Entry.of("Welt", 2l), + Entry.of("Hallo", 1l), + Entry.of("Boäh", 1l))), + KeyValue.pair( // 7 + "peter", + Ranking.of( + Entry.of("Welt", 2l), + Entry.of("Boäh", 2l), + Entry.of("Hallo", 1l))), + KeyValue.pair( // 8 + "klaus", + Ranking.of( + Entry.of("Müsch", 2l), + Entry.of("s", 2l))), + KeyValue.pair( // 9 + "peter", + Ranking.of( + Entry.of("Boäh", 3l), + Entry.of("Welt", 2l), + Entry.of("Hallo", 1l))), + KeyValue.pair( // 10 + "klaus", + Ranking.of( + Entry.of("s", 3l), + Entry.of("Müsch", 2l))), + }; + + static Map convertToMap(Properties properties) + { + return properties + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> (String)entry.getKey(), + entry -> entry.getValue() + )); + } + + static String parseHeader(Headers headers, String key) + { + Header header = headers.lastHeader(key); + if (header == null) + { + return key + "=null"; + } + else + { + return key + "=" + new String(header.value()); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java new file mode 100644 index 0000000..1e68219 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -0,0 +1,91 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.BiConsumer; + +import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; +import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; + + +@Slf4j +public class Top10StreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + @Test + public void test() + { + Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); + + Top10ApplicationConfiguration applicationConfiguriation = + new Top10ApplicationConfiguration(); + Properties streamProcessorProperties = + applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); + Map propertyMap = convertToMap(streamProcessorProperties); + + JsonSerde keySerde = new JsonSerde<>(); + keySerde.configure(propertyMap, true); + JsonSerde valueSerde = new JsonSerde<>(); + valueSerde.configure(propertyMap, false); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties); + + TestInputTopic in = testDriver.createInputTopic( + IN, + (JsonSerializer)keySerde.serializer(), + (JsonSerializer)valueSerde.serializer()); + + TestOutputTopic out = testDriver.createOutputTopic( + OUT, + (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)valueSerde.deserializer()); + + TestData.writeInputData(new BiConsumer() + { + private Instant timestamp = Instant.now(); + + @Override + public void accept(Key key, Counter value) + { + in.pipeInput( + key, + Entry.of(value.getWord(), value.getCounter()), + timestamp); + timestamp = timestamp.plusMillis(500); + } + }); + + List> receivedMessages = out + .readRecordsToList() + .stream() + .map(record -> + { + log.debug( + "OUT: {} -> {}, {}, {}", + record.key(), + record.value(), + parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME), + parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME)); + return KeyValue.pair(record.key(), record.value()); + }) + .toList(); + + TestData.assertExpectedResult(receivedMessages); + + testDriver.close(); + } +}