From 20327965c95d3697a7ad7f78403f194ca7c981ad Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 20 Jun 2024 22:19:17 +0200 Subject: [PATCH] stats: 1.0.0 - Renamed packages and classes -- ALIGN --- pom.xml | 8 +-- .../de/juplo/kafka/wordcount/stats/Entry.java | 4 +- .../de/juplo/kafka/wordcount/stats/Key.java | 2 +- .../juplo/kafka/wordcount/stats/Ranking.java | 18 ++--- .../wordcount/stats/StatsApplication.java | 6 +- .../stats/StatsApplicationConfiguration.java | 20 +++--- .../stats/StatsApplicationProperties.java | 12 ++-- .../wordcount/stats/StatsStreamProcessor.java | 16 ++--- .../kafka/wordcount/stats/WindowedKey.java | 4 +- .../kafka/wordcount/in/InputCounter.java | 10 +-- .../kafka/wordcount/in/InputWindowedKey.java | 6 +- .../juplo/kafka/wordcount/out/TestEntry.java | 4 +- .../kafka/wordcount/out/TestRanking.java | 3 +- .../juplo/kafka/wordcount/out/TestUser.java | 2 +- .../kafka/wordcount/stats/RankingTest.java | 6 +- .../wordcount/stats/StatsApplicationIT.java | 36 +++++----- .../StatsStreamProcessorTopologyTest.java | 28 ++++---- .../juplo/kafka/wordcount/stats/TestData.java | 68 +++++++++---------- 18 files changed, 126 insertions(+), 127 deletions(-) diff --git a/pom.xml b/pom.xml index b30c4ea..f9d0df3 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - top10 - 1.2.1 - Wordcount-Top-10 - Top-10 stream-processor of the multi-user wordcount-example + stats + 1.0.0 + Wordcount-Statistics + Statistics stream-processor of the multi-word wordcount-example 0.33.0 diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java index b25fc07..18fea0c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Entry.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AccessLevel; @@ -15,6 +15,6 @@ import lombok.NoArgsConstructor; @JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private String word; + private String key; private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java index ffac8ea..d79ca51 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.*; diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java index 4f56c18..25319d1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Ranking.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.*; @@ -49,7 +49,7 @@ public class Ranking for (int j = i+1; j < list.size(); j++) { entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) + if(entry.getKey().equals(newEntry.getKey())) { list.remove(j); break; @@ -63,7 +63,7 @@ public class Ranking return this; } - if (entry.getWord().equals(newEntry.getWord())) + if (entry.getKey().equals(newEntry.getKey())) oldPosition = i; } @@ -93,12 +93,12 @@ public class Ranking { Entry entry = this.entries[i]; - if (seenWords.contains(entry.getWord())) - throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord()); + if (seenWords.contains(entry.getKey())) + throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey()); if (entry.getCounter() > lowesCounting) throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); - seenWords.add(entry.getWord()); + seenWords.add(entry.getKey()); lowesCounting = entry.getCounter(); } @@ -128,13 +128,13 @@ public class Ranking Set otherWordsWithCurrentCount = new HashSet<>(); Entry myEntry = entries[i]; long currentCount = myEntry.getCounter(); - myWordsWithCurrentCount.add(myEntry.getWord()); + myWordsWithCurrentCount.add(myEntry.getKey()); while (true) { Entry otherEntry = other.entries[i]; if (otherEntry.getCounter() != currentCount) return false; - otherWordsWithCurrentCount.add(otherEntry.getWord()); + otherWordsWithCurrentCount.add(otherEntry.getKey()); if (++i >= entries.length) return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); myEntry = entries[i]; @@ -146,7 +146,7 @@ public class Ranking myWordsWithCurrentCount.clear(); otherWordsWithCurrentCount.clear(); } - myWordsWithCurrentCount.add(myEntry.getWord()); + myWordsWithCurrentCount.add(myEntry.getKey()); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java index 5c14ae7..54ebae4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplication.java @@ -1,14 +1,14 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication -public class Top10Application +public class StatsApplication { public static void main(String[] args) { - SpringApplication.run(Top10Application.class, args); + SpringApplication.run(StatsApplication.class, args); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java index 255f0e4..53ea1c5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -16,17 +16,17 @@ import org.springframework.kafka.support.serializer.JsonSerde; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Configuration -@EnableConfigurationProperties(Top10ApplicationProperties.class) +@EnableConfigurationProperties(StatsApplicationProperties.class) @Slf4j -public class Top10ApplicationConfiguration +public class StatsApplicationConfiguration { @Bean - public Properties streamProcessorProperties(Top10ApplicationProperties properties) + public Properties streamProcessorProperties(StatsApplicationProperties properties) { Properties props = new Properties(); @@ -51,26 +51,26 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, WindowedKey.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, "word:" + Key.class.getName() + "," + "counter:" + Entry.class.getName() + "," + - "user:" + User.class.getName() + "," + + "user:" + WindowedKey.class.getName() + "," + "ranking:" + Ranking.class.getName()); return props; } @Bean(initMethod = "start", destroyMethod = "stop") - public Top10StreamProcessor streamProcessor( - Top10ApplicationProperties applicationProperties, + public StatsStreamProcessor streamProcessor( + StatsApplicationProperties applicationProperties, Properties streamProcessorProperties, KeyValueBytesStoreSupplier storeSupplier, ConfigurableApplicationContext context) { - Top10StreamProcessor streamProcessor = new Top10StreamProcessor( + StatsStreamProcessor streamProcessor = new StatsStreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), streamProcessorProperties, diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java index d3bb236..841b290 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationProperties.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.Getter; @@ -7,16 +7,16 @@ import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties("juplo.wordcount.top10") +@ConfigurationProperties("juplo.wordcount.stats") @Getter @Setter @ToString -public class Top10ApplicationProperties +public class StatsApplicationProperties { private String bootstrapServer = "localhost:9092"; - private String applicationId = "top10"; - private String inputTopic = "countings"; - private String outputTopic = "top10"; + private String applicationId = "stats"; + private String inputTopic = "stats"; + private String outputTopic = "results"; private Integer commitInterval; private Integer cacheMaxBytes; } diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java index 70ead87..15c08ee 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.*; @@ -11,20 +11,20 @@ import java.util.Properties; @Slf4j -public class Top10StreamProcessor +public class StatsStreamProcessor { - public static final String STORE_NAME= "top10"; + public static final String STORE_NAME= "stats"; public final KafkaStreams streams; - public Top10StreamProcessor( + public StatsStreamProcessor( String inputTopic, String outputTopic, Properties props, KeyValueBytesStoreSupplier storeSupplier) { - Topology topology = Top10StreamProcessor.buildTopology( + Topology topology = StatsStreamProcessor.buildTopology( inputTopic, outputTopic, storeSupplier); @@ -41,11 +41,11 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) + .map((key, entry) -> new KeyValue<>(WindowedKey.of(key.getUser()), entry)) .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry), + (windowedKey, entry, ranking) -> ranking.add(entry), Materialized.as(storeSupplier)) .toStream() .to(outputTopic); @@ -56,7 +56,7 @@ public class Top10StreamProcessor return topology; } - ReadOnlyKeyValueStore getStore() + ReadOnlyKeyValueStore getStore() { return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java b/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java index 53c258d..9b77cac 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/WindowedKey.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import lombok.AllArgsConstructor; import lombok.Data; @@ -8,7 +8,7 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") @NoArgsConstructor @Data -public class User +public class WindowedKey { String user; } diff --git a/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java b/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java index d98ae64..ada66d3 100644 --- a/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/in/InputCounter.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.in; import lombok.AllArgsConstructor; import lombok.Data; @@ -8,14 +8,14 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor(staticName = "of") -public class TestCounter +public class InputCounter { String user; - String word; + String key; long counter; - public static TestCounter of(TestWord word, long counter) + public static InputCounter of(InputWindowedKey word, long counter) { - return new TestCounter(word.getUser(), word.getWord(), counter); + return new InputCounter(word.getUser(), word.getKey(), counter); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java b/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java index 8008e12..255d206 100644 --- a/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java +++ b/src/test/java/de/juplo/kafka/wordcount/in/InputWindowedKey.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.in; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AllArgsConstructor; @@ -10,8 +10,8 @@ import lombok.NoArgsConstructor; @NoArgsConstructor @Data @JsonIgnoreProperties(ignoreUnknown = true) -public class TestWord +public class InputWindowedKey { private String user; - private String word; + private String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java index a5152e6..f7f5679 100644 --- a/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestEntry.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.query; +package de.juplo.kafka.wordcount.out; import lombok.AllArgsConstructor; import lombok.Data; @@ -10,6 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestEntry { - String word; + String key; long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java index efad48b..62ed323 100644 --- a/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestRanking.java @@ -1,6 +1,5 @@ -package de.juplo.kafka.wordcount.query; +package de.juplo.kafka.wordcount.out; -import de.juplo.kafka.wordcount.top10.Entry; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java index 53a5992..3affeab 100644 --- a/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java +++ b/src/test/java/de/juplo/kafka/wordcount/out/TestUser.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.query; +package de.juplo.kafka.wordcount.out; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java index 26749e9..5599403 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/RankingTest.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -108,7 +108,7 @@ public class RankingTest Stream.of(highestEntry), VALID_RANKINGS[0] .stream() - .filter(entry -> !entry.getWord().equals(word))) + .filter(entry -> !entry.getKey().equals(word))) .toList(); assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); } @@ -134,7 +134,7 @@ public class RankingTest Ranking ranking = Ranking.of(toArray(entryList)); entryList.forEach(entry -> assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); + .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1)))); } @DisplayName("Identical rankings are considered equal") diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java index f5ef236..08d0eb4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/StatsApplicationIT.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; -import de.juplo.kafka.wordcount.counter.TestCounter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.in.InputCounter; +import de.juplo.kafka.wordcount.in.InputWindowedKey; +import de.juplo.kafka.wordcount.out.TestRanking; +import de.juplo.kafka.wordcount.out.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -28,7 +28,7 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; -import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.stats.StatsStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -36,23 +36,23 @@ import static org.awaitility.Awaitility.await; properties = { "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.in.InputWindowedKey,counter:de.juplo.kafka.wordcount.in.InputCounter", "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", + "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.out.TestUser,ranking:de.juplo.kafka.wordcount.out.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", "logging.level.org.apache.kafka.streams=INFO", - "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.top10.commit-interval=100", - "juplo.wordcount.top10.cacheMaxBytes=0", - "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, - "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) -@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) + "juplo.wordcount.stats.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.stats.commit-interval=100", + "juplo.wordcount.stats.cacheMaxBytes=0", + "juplo.wordcount.stats.input-topic=" + StatsApplicationIT.TOPIC_IN, + "juplo.wordcount.stats.output-topic=" + StatsApplicationIT.TOPIC_OUT }) +@EmbeddedKafka(topics = { StatsApplicationIT.TOPIC_IN, StatsApplicationIT.TOPIC_OUT }) @Slf4j -public class Top10ApplicationIT +public class StatsApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; @@ -60,12 +60,12 @@ public class Top10ApplicationIT @Autowired Consumer consumer; @Autowired - Top10StreamProcessor streamProcessor; + StatsStreamProcessor streamProcessor; @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -73,7 +73,7 @@ public class Top10ApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java index 90d8e4c..dad85df 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessorTopologyTest.java @@ -1,9 +1,9 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; -import de.juplo.kafka.wordcount.counter.TestCounter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.in.InputCounter; +import de.juplo.kafka.wordcount.in.InputWindowedKey; +import de.juplo.kafka.wordcount.out.TestRanking; +import de.juplo.kafka.wordcount.out.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -21,11 +21,11 @@ import org.springframework.util.MultiValueMap; import java.util.Map; -import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; +import static de.juplo.kafka.wordcount.stats.StatsApplicationConfiguration.serializationConfig; @Slf4j -public class Top10StreamProcessorTopologyTest +public class StatsStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; @@ -33,14 +33,14 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic in; + TestInputTopic in; TestOutputTopic out; @BeforeEach public void setUp() { - Topology topology = Top10StreamProcessor.buildTopology( + Topology topology = StatsStreamProcessor.buildTopology( IN, OUT, Stores.inMemoryKeyValueStore(STORE_NAME)); @@ -49,8 +49,8 @@ public class Top10StreamProcessorTopologyTest in = testDriver.createInputTopic( IN, - jsonSerializer(TestWord.class, true), - jsonSerializer(TestCounter.class,false)); + jsonSerializer(InputWindowedKey.class, true), + jsonSerializer(InputCounter.class,false)); out = testDriver.createOutputTopic( OUT, @@ -81,7 +81,7 @@ public class Top10StreamProcessorTopologyTest TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages); TestData.assertExpectedLastMessagesForUsers(receivedMessages); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(store); } @@ -97,8 +97,8 @@ public class Top10StreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "word:" + TestWord.class.getName() + "," + - "counter:" + TestCounter.class.getName()), + "word:" + InputWindowedKey.class.getName() + "," + + "counter:" + InputCounter.class.getName()), isKey); return jsonSerializer; } diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java index 7a3a27e..c884b06 100644 --- a/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/stats/TestData.java @@ -1,10 +1,10 @@ -package de.juplo.kafka.wordcount.top10; +package de.juplo.kafka.wordcount.stats; -import de.juplo.kafka.wordcount.counter.TestCounter; -import de.juplo.kafka.wordcount.counter.TestWord; -import de.juplo.kafka.wordcount.query.TestEntry; -import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.in.InputCounter; +import de.juplo.kafka.wordcount.in.InputWindowedKey; +import de.juplo.kafka.wordcount.out.TestEntry; +import de.juplo.kafka.wordcount.out.TestRanking; +import de.juplo.kafka.wordcount.out.TestUser; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; @@ -21,46 +21,46 @@ class TestData static final TestUser PETER = TestUser.of("peter"); static final TestUser KLAUS = TestUser.of("klaus"); - static final Stream> getInputMessages() + static final Stream> getInputMessages() { return Stream.of(INPUT_MESSAGES); } - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - TestWord.of(PETER.getUser(),"Hallo"), - TestCounter.of(PETER.getUser(),"Hallo",1)), + InputWindowedKey.of(PETER.getUser(),"Hallo"), + InputCounter.of(PETER.getUser(),"Hallo",1)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",1)), + InputWindowedKey.of(KLAUS.getUser(),"Müsch"), + InputCounter.of(KLAUS.getUser(),"Müsch",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",1)), + InputWindowedKey.of(PETER.getUser(),"Welt"), + InputCounter.of(PETER.getUser(),"Welt",1)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",2)), + InputWindowedKey.of(KLAUS.getUser(),"Müsch"), + InputCounter.of(KLAUS.getUser(),"Müsch",2)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",1)), + InputWindowedKey.of(KLAUS.getUser(),"s"), + InputCounter.of(KLAUS.getUser(),"s",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",1)), + InputWindowedKey.of(PETER.getUser(),"Boäh"), + InputCounter.of(PETER.getUser(),"Boäh",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",2)), + InputWindowedKey.of(PETER.getUser(),"Welt"), + InputCounter.of(PETER.getUser(),"Welt",2)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",2)), + InputWindowedKey.of(PETER.getUser(),"Boäh"), + InputCounter.of(PETER.getUser(),"Boäh",2)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",2)), + InputWindowedKey.of(KLAUS.getUser(),"s"), + InputCounter.of(KLAUS.getUser(),"s",2)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",3)), + InputWindowedKey.of(PETER.getUser(),"Boäh"), + InputCounter.of(PETER.getUser(),"Boäh",3)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",3)), + InputWindowedKey.of(KLAUS.getUser(),"s"), + InputCounter.of(KLAUS.getUser(),"s",3)), }; static void assertExpectedMessages(MultiValueMap receivedMessages) @@ -71,15 +71,15 @@ class TestData .containsExactlyElementsOf(rankings)); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(ReadOnlyKeyValueStore store) { assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); } - private static User userOf(TestUser user) + private static WindowedKey userOf(TestUser user) { - return User.of(user.getUser()); + return WindowedKey.of(user.getUser()); } static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) @@ -113,7 +113,7 @@ class TestData return Arrays .stream(entries) .map(entry -> TestEntry.of( - entry.getWord(), + entry.getKey(), entry.getCounter() == null ? -1l : entry.getCounter())) -- 2.20.1