From: Kai Moritz Date: Sat, 22 Jun 2024 14:09:46 +0000 (+0200) Subject: top10: 1.3.0 - Refined input JSON to match the new general stats-format X-Git-Tag: top10-1.3.0 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=5030aed19804a0c48f1968208e176657bdd147de;p=demos%2Fkafka%2Fwordcount top10: 1.3.0 - Refined input JSON to match the new general stats-format * Adapted the configuration to the changed type-mapping for the key. * Refined the class `Key`, that defines the JSON for the input key. ** Added attribute `type` with value of type `enum StatsType`. ** Renamed attribute `user` to `channel`. ** Renamed attribute `word` to `key`. * Refined the class `Entry`, that defines the JSON for the input value. ** Renamed attribute `word` to `key`. * Adapted test-classes and -cases accordingly. --- diff --git a/pom.xml b/pom.xml index b30c4ea..e5cd268 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.2.1 + 1.3.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index b25fc07..7d00500 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -15,6 +15,6 @@ import lombok.NoArgsConstructor; @JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private String word; + private String key; private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index ffac8ea..aaf016c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -12,6 +12,7 @@ import lombok.*; @JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String user; - private String word; + private StatsType type; + private String channel; + private String key; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index 4f56c18..279716a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -49,7 +49,7 @@ public class Ranking for (int j = i+1; j < list.size(); j++) { entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) + if(entry.getKey().equals(newEntry.getKey())) { list.remove(j); break; @@ -63,7 +63,7 @@ public class Ranking return this; } - if (entry.getWord().equals(newEntry.getWord())) + if (entry.getKey().equals(newEntry.getKey())) oldPosition = i; } @@ -93,12 +93,12 @@ public class Ranking { Entry entry = this.entries[i]; - if (seenWords.contains(entry.getWord())) - throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord()); + if (seenWords.contains(entry.getKey())) + throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey()); if (entry.getCounter() > lowesCounting) throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); - seenWords.add(entry.getWord()); + seenWords.add(entry.getKey()); lowesCounting = entry.getCounter(); } @@ -128,13 +128,13 @@ public class Ranking Set otherWordsWithCurrentCount = new HashSet<>(); Entry myEntry = entries[i]; long currentCount = myEntry.getCounter(); - myWordsWithCurrentCount.add(myEntry.getWord()); + myWordsWithCurrentCount.add(myEntry.getKey()); while (true) { Entry otherEntry = other.entries[i]; if (otherEntry.getCounter() != currentCount) return false; - otherWordsWithCurrentCount.add(otherEntry.getWord()); + otherWordsWithCurrentCount.add(otherEntry.getKey()); if (++i >= entries.length) return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); myEntry = entries[i]; @@ -146,7 +146,7 @@ public class Ranking myWordsWithCurrentCount.clear(); otherWordsWithCurrentCount.clear(); } - myWordsWithCurrentCount.add(myEntry.getWord()); + myWordsWithCurrentCount.add(myEntry.getKey()); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java new file mode 100644 index 0000000..b1b8f9b --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java @@ -0,0 +1,7 @@ +package de.juplo.kafka.wordcount.top10; + +enum StatsType +{ + COUNTER, + POPULAR +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 255f0e4..57e5a47 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -55,7 +55,7 @@ public class Top10ApplicationConfiguration props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, - "word:" + Key.class.getName() + "," + + "key:" + Key.class.getName() + "," + "counter:" + Entry.class.getName() + "," + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 70ead87..907c7ff 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -41,7 +41,7 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) + .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry)) .groupByKey() .aggregate( () -> new Ranking(), diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java index d98ae64..b78c429 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java @@ -10,12 +10,6 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class TestCounter { - String user; - String word; + String key; long counter; - - public static TestCounter of(TestWord word, long counter) - { - return new TestCounter(word.getUser(), word.getWord(), counter); - } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java index 8008e12..00c1af7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -12,6 +12,7 @@ import lombok.NoArgsConstructor; @JsonIgnoreProperties(ignoreUnknown = true) public class TestWord { - private String user; - private String word; + private String type; + private String channel; + private String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java index a5152e6..8019da9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java @@ -10,6 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestEntry { - String word; + String key; long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java index 26749e9..0f36860 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java @@ -108,7 +108,7 @@ public class RankingTest Stream.of(highestEntry), VALID_RANKINGS[0] .stream() - .filter(entry -> !entry.getWord().equals(word))) + .filter(entry -> !entry.getKey().equals(word))) .toList(); assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); } @@ -134,7 +134,7 @@ public class RankingTest Ranking ranking = Ranking.of(toArray(entryList)); entryList.forEach(entry -> assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); + .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1)))); } @DisplayName("Identical rankings are considered equal") diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 7a3a27e..4fb229b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { + static final String TYPE_COUNTER = "COUNTER"; + static final TestUser PETER = TestUser.of("peter"); static final TestUser KLAUS = TestUser.of("klaus"); @@ -29,38 +31,38 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - TestWord.of(PETER.getUser(),"Hallo"), - TestCounter.of(PETER.getUser(),"Hallo",1)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"), + TestCounter.of("Hallo",1)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",1)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestCounter.of("Müsch",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",1)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestCounter.of("Welt",1)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",2)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestCounter.of("Müsch",2)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",1)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestCounter.of("s",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",1)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestCounter.of("Boäh",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",2)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestCounter.of("Welt",2)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",2)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestCounter.of("Boäh",2)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",2)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestCounter.of("s",2)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",3)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestCounter.of("Boäh",3)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",3)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestCounter.of("s",3)), }; static void assertExpectedMessages(MultiValueMap receivedMessages) @@ -113,7 +115,7 @@ class TestData return Arrays .stream(entries) .map(entry -> TestEntry.of( - entry.getWord(), + entry.getKey(), entry.getCounter() == null ? -1l : entry.getCounter())) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index f5ef236..5f0e817 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -36,7 +36,7 @@ import static org.awaitility.Awaitility.await; properties = { "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index cca9a3a..a8fc859 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -111,7 +111,7 @@ public class Top10StreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "word:" + TestWord.class.getName() + "," + + "key:" + TestWord.class.getName() + "," + "counter:" + TestCounter.class.getName()), isKey); return jsonSerializer;