From 6500abf8a211334e869de7cc2f354368f0263bc1 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 3 Jun 2024 10:00:33 +0200 Subject: [PATCH 01/16] top10: 1.2.1 - Removed logging of type-headers in tests --- .../de/juplo/kafka/wordcount/top10/TestData.java | 15 --------------- .../top10/Top10StreamProcessorTopologyTest.java | 14 +------------- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index f4e557c..e0c53df 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -2,8 +2,6 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; @@ -174,17 +172,4 @@ class TestData .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); return expectedMessages; } - - static String parseHeader(Headers headers, String key) - { - Header header = headers.lastHeader(key); - if (header == null) - { - return key + "=null"; - } - else - { - return key + "=" + new String(header.value()); - } - } } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index f2a9eca..84cfb1e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -20,10 +20,7 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Stream; -import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; -import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; -import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @Slf4j @@ -84,16 +81,7 @@ public class Top10StreamProcessorTopologyTest MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() - .forEach(record -> - { - log.debug( - "OUT: {} -> {}, {}, {}", - record.key(), - record.value(), - parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME), - parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME)); - receivedMessages.add(record.key(), record.value()); - }); + .forEach(record -> receivedMessages.add(record.key(), record.value())); TestData.assertExpectedMessages(receivedMessages); -- 2.20.1 From 3b7fae76b8abb62a8cae3a4a32c880b29bce0574 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 13:20:52 +0200 Subject: [PATCH 02/16] top10: 1.2.1 - The name of the state-store is an internal detail --- .../wordcount/top10/Top10ApplicationConfiguration.java | 3 ++- .../juplo/kafka/wordcount/top10/Top10StreamProcessor.java | 6 ++++-- .../de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index bb6fef7..bd5298d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -94,6 +95,6 @@ public class Top10ApplicationConfiguration @Bean public KeyValueBytesStoreSupplier storeSupplier() { - return Stores.persistentKeyValueStore("top10"); + return Stores.persistentKeyValueStore(STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 343ab4d..70ead87 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -13,6 +13,8 @@ import java.util.Properties; @Slf4j public class Top10StreamProcessor { + public static final String STORE_NAME= "top10"; + public final KafkaStreams streams; @@ -54,9 +56,9 @@ public class Top10StreamProcessor return topology; } - ReadOnlyKeyValueStore getStore(String name) + ReadOnlyKeyValueStore getStore() { - return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 1097310..88d03ba 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -25,6 +25,7 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -55,7 +56,6 @@ public class Top10ApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; - public static final String STORE_NAME = "TEST-STORE"; @Autowired Consumer consumer; @@ -94,7 +94,7 @@ public class Top10ApplicationIT { await("Expected state") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME))); + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); } @DisplayName("Await the expected output messages") -- 2.20.1 From 82a1b51dc142e75198f3f3f10c4effdeac3c673b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 15:04:31 +0200 Subject: [PATCH 03/16] top10: 1.2.1 - Introduced `TestUser` --- .../de/juplo/kafka/wordcount/query/TestUser.java | 14 ++++++++++++++ .../kafka/wordcount/top10/Top10ApplicationIT.java | 8 ++++---- 2 files changed, 18 insertions(+), 4 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/TestUser.java diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java new file mode 100644 index 0000000..53a5992 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 88d03ba..1bec92d 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -38,9 +39,8 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.properties.spring.json.use.type.headers=false", - "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.query.TestUser", "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -132,11 +132,11 @@ public class Top10ApplicationIT @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) User user, + @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, @Payload TestRanking ranking) { log.debug("Received message: {} -> {}", user, ranking); - received.add(user, Ranking.of(ranking.getEntries())); + received.add(User.of(user.getUser()), Ranking.of(ranking.getEntries())); } synchronized MultiValueMap getReceivedMessages() -- 2.20.1 From a7c3a493ee3b6ec8629eb749d1ce4ed059f4cff9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 18:05:45 +0200 Subject: [PATCH 04/16] top10: 1.2.1 - Simplified setup of `Top10StreamProcessorToplogyTest` --- .../top10/Top10ApplicationConfiguration.java | 6 +-- .../Top10StreamProcessorTopologyTest.java | 40 +++++++++++-------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index bd5298d..5e56066 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -13,8 +13,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; -import java.util.HashMap; -import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -46,9 +44,9 @@ public class Top10ApplicationConfiguration return props; } - static Map serializationConfig() + static Properties serializationConfig() { - Map props = new HashMap<>(); + Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 84cfb1e..80fc0df 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -1,5 +1,7 @@ package de.juplo.kafka.wordcount.top10; +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -11,13 +13,11 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.util.Map; -import java.util.Properties; import java.util.stream.Stream; import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; @@ -44,27 +44,21 @@ public class Top10StreamProcessorTopologyTest OUT, Stores.inMemoryKeyValueStore(STORE_NAME)); - Map propertyMap = serializationConfig(); - - Properties properties = new Properties(); - properties.putAll(propertyMap); - - JsonSerde keySerde = new JsonSerde<>(); - keySerde.configure(propertyMap, true); - JsonSerde valueSerde = new JsonSerde<>(); - valueSerde.configure(propertyMap, false); - - testDriver = new TopologyTestDriver(topology, properties); + testDriver = new TopologyTestDriver(topology, serializationConfig()); in = testDriver.createInputTopic( IN, - (JsonSerializer)keySerde.serializer(), - (JsonSerializer)valueSerde.serializer()); + jsonSerializer(Key.class, true), + jsonSerializer(Entry.class,false)); out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), - (JsonDeserializer)valueSerde.deserializer()); + new JsonDeserializer() + .copyWithType(User.class) + .ignoreTypeHeaders(), + new JsonDeserializer() + .copyWithType(Ranking.class) + .ignoreTypeHeaders()); } @@ -97,4 +91,16 @@ public class Top10StreamProcessorTopologyTest { testDriver.close(); } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "word:" + TestWord.class.getName() + "," + + "counter:" + TestCounter.class.getName()), + isKey); + return jsonSerializer; + } } -- 2.20.1 From 6738dd374575d4d86a966972d2e25661c2ad1523 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 15:19:23 +0200 Subject: [PATCH 05/16] top10: 1.2.1 - `TestData` uses faked foreign classes for input-/output data --- .../kafka/wordcount/query/TestEntry.java | 15 +++ .../kafka/wordcount/query/TestRanking.java | 12 +- .../juplo/kafka/wordcount/top10/TestData.java | 123 +++++++++++------- .../wordcount/top10/Top10ApplicationIT.java | 6 +- .../Top10StreamProcessorTopologyTest.java | 20 +-- 5 files changed, 114 insertions(+), 62 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java new file mode 100644 index 0000000..a5152e6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String word; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java index e7f8053..efad48b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java @@ -1,11 +1,21 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.Entry; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor @Data public class TestRanking { - private Entry[] entries; + private TestEntry[] entries; + + public static TestRanking of(TestEntry... entries) + { + return new TestRanking(entries); + } } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index e0c53df..84c81f5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -2,11 +2,15 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestEntry; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.util.Arrays; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -14,8 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final User PETER = User.of("peter"); - static final User KLAUS = User.of("klaus"); + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { @@ -54,7 +58,7 @@ class TestData TestCounter.of(KLAUS.getUser(),"s",3)), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( (user, rankings) -> @@ -64,39 +68,62 @@ class TestData static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); } - static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) + static User userOf(TestUser user) + { + return User.of(user.getUser()); + } + + static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) { assertThat(countMessagesForUser(PETER, receivedMessages)); assertThat(countMessagesForUser(KLAUS, receivedMessages)); } - static int countMessagesForUser(User user, MultiValueMap messagesForUsers) + static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) { return messagesForUsers.get(user).size(); } - static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) + static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) { assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); } - static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking) + static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) + { + TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); + assertRankingEqualsRankingFromLastMessage(user, testRanking); + } + + static TestEntry[] testEntriesOf(Entry... entries) + { + return Arrays + .stream(entries) + .map(entry -> TestEntry.of( + entry.getWord(), + entry.getCounter() == null + ? -1l + : entry.getCounter())) + .toArray(size -> new TestEntry[size]); + } + + static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) { assertThat(ranking).isEqualTo(getLastMessageFor(user)); } - static Ranking getLastMessageFor(User user) + static TestRanking getLastMessageFor(TestUser user) { return getLastMessageFor(user, expectedMessages()); } - static Ranking getLastMessageFor(User user, MultiValueMap messagesForUsers) + static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) { return messagesForUsers .get(user) @@ -104,69 +131,69 @@ class TestData .reduce(null, (left, right) -> right); } - static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, - Ranking.of( - Entry.of("Hallo", 1l))), + TestRanking.of( + TestEntry.of("Hallo", 1l))), KeyValue.pair( // 1 KLAUS, - Ranking.of( - Entry.of("Müsch", 1l))), + TestRanking.of( + TestEntry.of("Müsch", 1l))), KeyValue.pair( // 2 PETER, - Ranking.of( - Entry.of("Hallo", 1l), - Entry.of("Welt", 1l))), + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), KeyValue.pair( // 3 KLAUS, - Ranking.of( - Entry.of("Müsch", 2l))), + TestRanking.of( + TestEntry.of("Müsch", 2l))), KeyValue.pair( // 4 KLAUS, - Ranking.of( - Entry.of("Müsch", 2l), - Entry.of("s", 1l))), + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), KeyValue.pair( // 5 PETER, - Ranking.of( - Entry.of("Hallo", 1l), - Entry.of("Welt", 1l), - Entry.of("Boäh", 1l))), + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), KeyValue.pair( // 6 PETER, - Ranking.of( - Entry.of("Welt", 2l), - Entry.of("Hallo", 1l), - Entry.of("Boäh", 1l))), + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), KeyValue.pair( // 7 PETER, - Ranking.of( - Entry.of("Welt", 2l), - Entry.of("Boäh", 2l), - Entry.of("Hallo", 1l))), + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), KeyValue.pair( // 8 KLAUS, - Ranking.of( - Entry.of("Müsch", 2l), - Entry.of("s", 2l))), + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), KeyValue.pair( // 9 PETER, - Ranking.of( - Entry.of("Boäh", 3l), - Entry.of("Welt", 2l), - Entry.of("Hallo", 1l))), + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), KeyValue.pair( // 10 KLAUS, - Ranking.of( - Entry.of("s", 3l), - Entry.of("Müsch", 2l))), + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 1bec92d..5e1f45c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -128,7 +128,7 @@ public class Top10ApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( @@ -136,10 +136,10 @@ public class Top10ApplicationIT @Payload TestRanking ranking) { log.debug("Received message: {} -> {}", user, ranking); - received.add(User.of(user.getUser()), Ranking.of(ranking.getEntries())); + received.add(user, ranking); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 80fc0df..cd09c06 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -2,6 +2,8 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -32,8 +34,8 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; + TestInputTopic in; + TestOutputTopic out; @BeforeEach @@ -48,16 +50,16 @@ public class Top10StreamProcessorTopologyTest in = testDriver.createInputTopic( IN, - jsonSerializer(Key.class, true), - jsonSerializer(Entry.class,false)); + jsonSerializer(TestWord.class, true), + jsonSerializer(TestCounter.class,false)); out = testDriver.createOutputTopic( OUT, new JsonDeserializer() - .copyWithType(User.class) + .copyWithType(TestUser.class) .ignoreTypeHeaders(), new JsonDeserializer() - .copyWithType(Ranking.class) + .copyWithType(TestRanking.class) .ignoreTypeHeaders()); } @@ -68,11 +70,9 @@ public class Top10StreamProcessorTopologyTest { Stream .of(TestData.INPUT_MESSAGES) - .forEach(kv -> in.pipeInput( - Key.of(kv.key.getUser(), kv.key.getWord()), - Entry.of(kv.value.getWord(), kv.value.getCounter()))); + .forEach(kv -> in.pipeInput(kv.key, kv.value)); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> receivedMessages.add(record.key(), record.value())); -- 2.20.1 From 8dfd8ee1b26d8d95e95402d9ba2128a113181396 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 19:35:21 +0200 Subject: [PATCH 06/16] top10: 1.2.1 - Fixed possible NPE in `Top10ApplicationIT` --- src/test/java/de/juplo/kafka/wordcount/top10/TestData.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 84c81f5..cf52c1f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -85,7 +85,9 @@ class TestData static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) { - return messagesForUsers.get(user).size(); + return messagesForUsers.get(user) == null + ? 0 + : messagesForUsers.get(user).size(); } -- 2.20.1 From ef9fff623043d2dffddbe8e70d984106cdde6c6b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 18:18:41 +0200 Subject: [PATCH 07/16] top10: 1.2.1 - `Top10ApplicationIT` asserts type-mapping for output-data --- .../de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 5e1f45c..ea8fe18 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -38,9 +38,7 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.use.type.headers=false", - "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.query.TestUser", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking", + "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", -- 2.20.1 From e59d8080ffe2750774c881766e691012964c9099 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 18:30:28 +0200 Subject: [PATCH 08/16] top10: 1.2.1 - Fixed race-condition in `Top10ApplicationIT` --- .../kafka/wordcount/top10/Top10ApplicationIT.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index ea8fe18..a1bc1f0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -102,7 +102,8 @@ public class Top10ApplicationIT { await("Expected messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); } @DisplayName("Await the expected number of messages") @@ -111,7 +112,8 @@ public class Top10ApplicationIT { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedNumberOfMessagesForUsers(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages))); } @DisplayName("Await the expected final output messages") @@ -120,7 +122,8 @@ public class Top10ApplicationIT { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages))); } @@ -137,9 +140,10 @@ public class Top10ApplicationIT received.add(user, ranking); } - synchronized MultiValueMap getReceivedMessages() + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) { - return received; + assertion.accept(received); } } -- 2.20.1 From 478b25d6700bd0a48b55647f09386f3b1d59fa29 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 8 Jun 2024 18:37:55 +0200 Subject: [PATCH 09/16] top10: 1.2.1 - Refined `TestData` clearified concerns --- .../juplo/kafka/wordcount/top10/TestData.java | 25 +++++++++++-------- .../wordcount/top10/Top10ApplicationIT.java | 12 +++++---- .../Top10StreamProcessorTopologyTest.java | 5 ++-- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index cf52c1f..7a3a27e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -21,7 +21,12 @@ class TestData static final TestUser PETER = TestUser.of("peter"); static final TestUser KLAUS = TestUser.of("klaus"); - static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + static final Stream> getInputMessages() + { + return Stream.of(INPUT_MESSAGES); + } + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( TestWord.of(PETER.getUser(),"Hallo"), @@ -72,7 +77,7 @@ class TestData assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); } - static User userOf(TestUser user) + private static User userOf(TestUser user) { return User.of(user.getUser()); } @@ -83,7 +88,7 @@ class TestData assertThat(countMessagesForUser(KLAUS, receivedMessages)); } - static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) + private static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) { return messagesForUsers.get(user) == null ? 0 @@ -97,13 +102,13 @@ class TestData assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); } - static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) + private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) { TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); assertRankingEqualsRankingFromLastMessage(user, testRanking); } - static TestEntry[] testEntriesOf(Entry... entries) + private static TestEntry[] testEntriesOf(Entry... entries) { return Arrays .stream(entries) @@ -115,17 +120,17 @@ class TestData .toArray(size -> new TestEntry[size]); } - static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) + private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) { assertThat(ranking).isEqualTo(getLastMessageFor(user)); } - static TestRanking getLastMessageFor(TestUser user) + private static TestRanking getLastMessageFor(TestUser user) { return getLastMessageFor(user, expectedMessages()); } - static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) + private static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) { return messagesForUsers .get(user) @@ -133,7 +138,7 @@ class TestData .reduce(null, (left, right) -> right); } - static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, @@ -193,7 +198,7 @@ class TestData TestEntry.of("Müsch", 2l))), }; - static MultiValueMap expectedMessages() + private static MultiValueMap expectedMessages() { MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index a1bc1f0..f5ef236 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -1,13 +1,16 @@ package de.juplo.kafka.wordcount.top10; -import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestRanking; import de.juplo.kafka.wordcount.query.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.*; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; @@ -24,7 +27,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -65,8 +67,8 @@ public class Top10ApplicationIT public static void testSendMessage( @Autowired KafkaTemplate kafkaTemplate) { - Stream - .of(TestData.INPUT_MESSAGES) + TestData + .getInputMessages() .forEach(kv -> { try diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index cd09c06..90d8e4c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -20,7 +20,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.util.Map; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; @@ -68,8 +67,8 @@ public class Top10StreamProcessorTopologyTest @Test public void test() { - Stream - .of(TestData.INPUT_MESSAGES) + TestData + .getInputMessages() .forEach(kv -> in.pipeInput(kv.key, kv.value)); MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); -- 2.20.1 From aa9d59fa93178f40797a8bb3716dd0a30594833f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 20:36:56 +0200 Subject: [PATCH 10/16] top10: 1.2.1 - Simplified serialization configuration --- .../kafka/wordcount/top10/Top10ApplicationConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 5e56066..255f0e4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -34,6 +34,7 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + if (properties.getCommitInterval() != null) props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); if (properties.getCacheMaxBytes() != null) @@ -50,7 +51,6 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( -- 2.20.1 From 066ac7e345947764b28084b9961b1633264d63d0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 17:53:34 +0200 Subject: [PATCH 11/16] top10: 1.2.1 - Refined `Top10StreamProcessorTopologyTest` (splitted tests) --- .../Top10StreamProcessorTopologyTest.java | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 90d8e4c..cca9a3a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -11,8 +11,9 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; @@ -22,6 +23,7 @@ import org.springframework.util.MultiValueMap; import java.util.Map; import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; @Slf4j @@ -29,16 +31,13 @@ public class Top10StreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + static TopologyTestDriver testDriver; + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); - TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; - - @BeforeEach - public void setUp() + @BeforeAll + public static void setUp() { Topology topology = Top10StreamProcessor.buildTopology( IN, @@ -47,12 +46,12 @@ public class Top10StreamProcessorTopologyTest testDriver = new TopologyTestDriver(topology, serializationConfig()); - in = testDriver.createInputTopic( + TestInputTopic in = testDriver.createInputTopic( IN, jsonSerializer(TestWord.class, true), jsonSerializer(TestCounter.class,false)); - out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, new JsonDeserializer() .copyWithType(TestUser.class) @@ -61,37 +60,52 @@ public class Top10StreamProcessorTopologyTest .copyWithType(TestRanking.class) .ignoreTypeHeaders()); - } - - - @Test - public void test() - { TestData .getInputMessages() .forEach(kv -> in.pipeInput(kv.key, kv.value)); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> receivedMessages.add(record.key(), record.value())); + } + + @DisplayName("Assert the expected output messages") + @Test + public void testExpectedMessages() + { TestData.assertExpectedMessages(receivedMessages); + } + @DisplayName("Assert the expected number of messages") + @Test + public void testExpectedNumberOfMessagesForUsers() + { TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages); + } + + @DisplayName("Assert the expected final output messages") + @Test + public void testExpectedLastMessagesForUSers() + { TestData.assertExpectedLastMessagesForUsers(receivedMessages); + } + @DisplayName("Assert the expected state in the state-store") + @Test + public void testExpectedState() + { KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(store); } - @AfterEach - public void tearDown() + @AfterAll + public static void tearDown() { testDriver.close(); } - private JsonSerializer jsonSerializer(Class type, boolean isKey) + private static JsonSerializer jsonSerializer(Class type, boolean isKey) { JsonSerializer jsonSerializer = new JsonSerializer<>(); jsonSerializer.configure( -- 2.20.1 From 5030aed19804a0c48f1968208e176657bdd147de Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 16:09:46 +0200 Subject: [PATCH 12/16] top10: 1.3.0 - Refined input JSON to match the new general stats-format * Adapted the configuration to the changed type-mapping for the key. * Refined the class `Key`, that defines the JSON for the input key. ** Added attribute `type` with value of type `enum StatsType`. ** Renamed attribute `user` to `channel`. ** Renamed attribute `word` to `key`. * Refined the class `Entry`, that defines the JSON for the input value. ** Renamed attribute `word` to `key`. * Adapted test-classes and -cases accordingly. --- pom.xml | 2 +- .../de/juplo/kafka/wordcount/top10/Entry.java | 2 +- .../de/juplo/kafka/wordcount/top10/Key.java | 5 +- .../juplo/kafka/wordcount/top10/Ranking.java | 16 +++---- .../kafka/wordcount/top10/StatsType.java | 7 +++ .../top10/Top10ApplicationConfiguration.java | 2 +- .../wordcount/top10/Top10StreamProcessor.java | 2 +- .../kafka/wordcount/counter/TestCounter.java | 8 +--- .../kafka/wordcount/counter/TestWord.java | 5 +- .../kafka/wordcount/query/TestEntry.java | 2 +- .../kafka/wordcount/top10/RankingTest.java | 4 +- .../juplo/kafka/wordcount/top10/TestData.java | 48 ++++++++++--------- .../wordcount/top10/Top10ApplicationIT.java | 2 +- .../Top10StreamProcessorTopologyTest.java | 2 +- 14 files changed, 56 insertions(+), 51 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java diff --git a/pom.xml b/pom.xml index b30c4ea..e5cd268 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.2.1 + 1.3.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index b25fc07..7d00500 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -15,6 +15,6 @@ import lombok.NoArgsConstructor; @JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private String word; + private String key; private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index ffac8ea..aaf016c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -12,6 +12,7 @@ import lombok.*; @JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String user; - private String word; + private StatsType type; + private String channel; + private String key; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index 4f56c18..279716a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -49,7 +49,7 @@ public class Ranking for (int j = i+1; j < list.size(); j++) { entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) + if(entry.getKey().equals(newEntry.getKey())) { list.remove(j); break; @@ -63,7 +63,7 @@ public class Ranking return this; } - if (entry.getWord().equals(newEntry.getWord())) + if (entry.getKey().equals(newEntry.getKey())) oldPosition = i; } @@ -93,12 +93,12 @@ public class Ranking { Entry entry = this.entries[i]; - if (seenWords.contains(entry.getWord())) - throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord()); + if (seenWords.contains(entry.getKey())) + throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getKey()); if (entry.getCounter() > lowesCounting) throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); - seenWords.add(entry.getWord()); + seenWords.add(entry.getKey()); lowesCounting = entry.getCounter(); } @@ -128,13 +128,13 @@ public class Ranking Set otherWordsWithCurrentCount = new HashSet<>(); Entry myEntry = entries[i]; long currentCount = myEntry.getCounter(); - myWordsWithCurrentCount.add(myEntry.getWord()); + myWordsWithCurrentCount.add(myEntry.getKey()); while (true) { Entry otherEntry = other.entries[i]; if (otherEntry.getCounter() != currentCount) return false; - otherWordsWithCurrentCount.add(otherEntry.getWord()); + otherWordsWithCurrentCount.add(otherEntry.getKey()); if (++i >= entries.length) return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); myEntry = entries[i]; @@ -146,7 +146,7 @@ public class Ranking myWordsWithCurrentCount.clear(); otherWordsWithCurrentCount.clear(); } - myWordsWithCurrentCount.add(myEntry.getWord()); + myWordsWithCurrentCount.add(myEntry.getKey()); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java new file mode 100644 index 0000000..b1b8f9b --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/StatsType.java @@ -0,0 +1,7 @@ +package de.juplo.kafka.wordcount.top10; + +enum StatsType +{ + COUNTER, + POPULAR +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 255f0e4..57e5a47 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -55,7 +55,7 @@ public class Top10ApplicationConfiguration props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, - "word:" + Key.class.getName() + "," + + "key:" + Key.class.getName() + "," + "counter:" + Entry.class.getName() + "," + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 70ead87..907c7ff 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -41,7 +41,7 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) + .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry)) .groupByKey() .aggregate( () -> new Ranking(), diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java index d98ae64..b78c429 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java @@ -10,12 +10,6 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class TestCounter { - String user; - String word; + String key; long counter; - - public static TestCounter of(TestWord word, long counter) - { - return new TestCounter(word.getUser(), word.getWord(), counter); - } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java index 8008e12..00c1af7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -12,6 +12,7 @@ import lombok.NoArgsConstructor; @JsonIgnoreProperties(ignoreUnknown = true) public class TestWord { - private String user; - private String word; + private String type; + private String channel; + private String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java index a5152e6..8019da9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java @@ -10,6 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestEntry { - String word; + String key; long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java index 26749e9..0f36860 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java @@ -108,7 +108,7 @@ public class RankingTest Stream.of(highestEntry), VALID_RANKINGS[0] .stream() - .filter(entry -> !entry.getWord().equals(word))) + .filter(entry -> !entry.getKey().equals(word))) .toList(); assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); } @@ -134,7 +134,7 @@ public class RankingTest Ranking ranking = Ranking.of(toArray(entryList)); entryList.forEach(entry -> assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); + .isThrownBy(() -> ranking.add(Entry.of(entry.getKey(), entry.getCounter() - 1)))); } @DisplayName("Identical rankings are considered equal") diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 7a3a27e..4fb229b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -18,6 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { + static final String TYPE_COUNTER = "COUNTER"; + static final TestUser PETER = TestUser.of("peter"); static final TestUser KLAUS = TestUser.of("klaus"); @@ -29,38 +31,38 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - TestWord.of(PETER.getUser(),"Hallo"), - TestCounter.of(PETER.getUser(),"Hallo",1)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"), + TestCounter.of("Hallo",1)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",1)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestCounter.of("Müsch",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",1)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestCounter.of("Welt",1)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"Müsch"), - TestCounter.of(KLAUS.getUser(),"Müsch",2)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestCounter.of("Müsch",2)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",1)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestCounter.of("s",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",1)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestCounter.of("Boäh",1)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Welt"), - TestCounter.of(PETER.getUser(),"Welt",2)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestCounter.of("Welt",2)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",2)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestCounter.of("Boäh",2)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",2)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestCounter.of("s",2)), new KeyValue<>( - TestWord.of(PETER.getUser(),"Boäh"), - TestCounter.of(PETER.getUser(),"Boäh",3)), + TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestCounter.of("Boäh",3)), new KeyValue<>( - TestWord.of(KLAUS.getUser(),"s"), - TestCounter.of(KLAUS.getUser(),"s",3)), + TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestCounter.of("s",3)), }; static void assertExpectedMessages(MultiValueMap receivedMessages) @@ -113,7 +115,7 @@ class TestData return Arrays .stream(entries) .map(entry -> TestEntry.of( - entry.getWord(), + entry.getKey(), entry.getCounter() == null ? -1l : entry.getCounter())) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index f5ef236..5f0e817 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -36,7 +36,7 @@ import static org.awaitility.Awaitility.await; properties = { "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index cca9a3a..a8fc859 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -111,7 +111,7 @@ public class Top10StreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "word:" + TestWord.class.getName() + "," + + "key:" + TestWord.class.getName() + "," + "counter:" + TestCounter.class.getName()), isKey); return jsonSerializer; -- 2.20.1 From 35a437286fc5bb622f25ae71dffbc8f8c4a16748 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 17:05:53 +0200 Subject: [PATCH 13/16] top10: 1.4.0 - Refined output JSON -- MOVE --- .../java/de/juplo/kafka/wordcount/top10/{User.java => Stats.java} | 0 .../juplo/kafka/wordcount/query/{TestUser.java => TestStats.java} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/main/java/de/juplo/kafka/wordcount/top10/{User.java => Stats.java} (100%) rename src/test/java/de/juplo/kafka/wordcount/query/{TestUser.java => TestStats.java} (100%) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java similarity index 100% rename from src/main/java/de/juplo/kafka/wordcount/top10/User.java rename to src/main/java/de/juplo/kafka/wordcount/top10/Stats.java diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java similarity index 100% rename from src/test/java/de/juplo/kafka/wordcount/query/TestUser.java rename to src/test/java/de/juplo/kafka/wordcount/query/TestStats.java -- 2.20.1 From 238491ed4d33495202e79879954802e5d0836006 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 17:07:15 +0200 Subject: [PATCH 14/16] top10: 1.4.0 - Refined output JSON -- ALIGN --- pom.xml | 2 +- .../de/juplo/kafka/wordcount/top10/Stats.java | 5 +- .../top10/Top10ApplicationConfiguration.java | 4 +- .../wordcount/top10/Top10StreamProcessor.java | 8 +- .../kafka/wordcount/query/TestStats.java | 5 +- .../juplo/kafka/wordcount/top10/TestData.java | 84 ++++++++++--------- .../wordcount/top10/Top10ApplicationIT.java | 18 ++-- .../Top10StreamProcessorTopologyTest.java | 18 ++-- 8 files changed, 75 insertions(+), 69 deletions(-) diff --git a/pom.xml b/pom.xml index e5cd268..0dce2d1 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.3.0 + 1.4.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java index 53c258d..05c2a91 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java @@ -8,7 +8,8 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") @NoArgsConstructor @Data -public class User +public class Stats { - String user; + StatsType type; + String channel; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 57e5a47..aecd260 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -51,13 +51,13 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Stats.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, "key:" + Key.class.getName() + "," + "counter:" + Entry.class.getName() + "," + - "user:" + User.class.getName() + "," + + "stats:" + Stats.class.getName() + "," + "ranking:" + Ranking.class.getName()); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 907c7ff..1235132 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -41,11 +41,13 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry)) + .map((key, entry) -> new KeyValue<>( + Stats.of(key.getType(), key.getChannel()), + entry)) .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry), + (stats, entry, ranking) -> ranking.add(entry), Materialized.as(storeSupplier)) .toStream() .to(outputTopic); @@ -56,7 +58,7 @@ public class Top10StreamProcessor return topology; } - ReadOnlyKeyValueStore getStore() + ReadOnlyKeyValueStore getStore() { return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java index 53a5992..6a47193 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java @@ -8,7 +8,8 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") @NoArgsConstructor @Data -public class TestUser +public class TestStats { - String user; + String type; + String channel; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 4fb229b..069f49a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -4,7 +4,7 @@ import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestEntry; import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.query.TestStats; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; @@ -20,8 +20,8 @@ class TestData { static final String TYPE_COUNTER = "COUNTER"; - static final TestUser PETER = TestUser.of("peter"); - static final TestUser KLAUS = TestUser.of("klaus"); + static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter"); + static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus"); static final Stream> getInputMessages() { @@ -31,83 +31,85 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"), TestCounter.of("Hallo",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"), TestCounter.of("Müsch",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"), TestCounter.of("Welt",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"), TestCounter.of("Müsch",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), TestCounter.of("s",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), TestCounter.of("Boäh",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"), TestCounter.of("Welt",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), TestCounter.of("Boäh",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), TestCounter.of("s",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), TestCounter.of("Boäh",3)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), TestCounter.of("s",3)), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( - (user, rankings) -> - assertThat(receivedMessages.get(user)) + (stats, rankings) -> + assertThat(receivedMessages.get(stats)) .containsExactlyElementsOf(rankings)); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); + assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER))); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS))); } - private static User userOf(TestUser user) + private static Stats statsOf(TestStats stats) { - return User.of(user.getUser()); + return Stats.of( + StatsType.valueOf(stats.getType()), + stats.getChannel()); } - static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) + static void assertExpectedNumberOfMessages(MultiValueMap receivedMessages) { - assertThat(countMessagesForUser(PETER, receivedMessages)); - assertThat(countMessagesForUser(KLAUS, receivedMessages)); + assertThat(countMessages(PETER, receivedMessages)); + assertThat(countMessages(KLAUS, receivedMessages)); } - private static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) + private static int countMessages(TestStats stats, MultiValueMap messagesFor) { - return messagesForUsers.get(user) == null + return messagesFor.get(stats) == null ? 0 - : messagesForUsers.get(user).size(); + : messagesFor.get(stats).size(); } - static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) + static void assertExpectedLastMessages(MultiValueMap receivedMessages) { assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); } - private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) + private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking) { TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); - assertRankingEqualsRankingFromLastMessage(user, testRanking); + assertRankingEqualsRankingFromLastMessage(stats, testRanking); } private static TestEntry[] testEntriesOf(Entry... entries) @@ -122,25 +124,25 @@ class TestData .toArray(size -> new TestEntry[size]); } - private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) + private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking) { - assertThat(ranking).isEqualTo(getLastMessageFor(user)); + assertThat(ranking).isEqualTo(getLastMessageFor(stats)); } - private static TestRanking getLastMessageFor(TestUser user) + private static TestRanking getLastMessageFor(TestStats stats) { - return getLastMessageFor(user, expectedMessages()); + return getLastMessageFor(stats, expectedMessages()); } - private static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) + private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap messagesFor) { - return messagesForUsers - .get(user) + return messagesFor + .get(stats) .stream() .reduce(null, (left, right) -> right); } - private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, @@ -200,9 +202,9 @@ class TestData TestEntry.of("Müsch", 2l))), }; - private static MultiValueMap expectedMessages() + private static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 5f0e817..51e424e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.query.TestStats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -40,7 +40,7 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", + "spring.kafka.consumer.properties.spring.json.type.mapping=stats:de.juplo.kafka.wordcount.query.TestStats,ranking:de.juplo.kafka.wordcount.query.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -110,32 +110,32 @@ public class Top10ApplicationIT @DisplayName("Await the expected number of messages") @Test - public void testAwaitExpectedNumberOfMessagesForUsers() + public void testAwaitExpectedNumberOfMessages() { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages))); + receivedMessages -> TestData.assertExpectedNumberOfMessages(receivedMessages))); } @DisplayName("Await the expected final output messages") @Test - public void testAwaitExpectedLastMessagesForUsers() + public void testAwaitExpectedLastMessages() { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages))); + receivedMessages -> TestData.assertExpectedLastMessages(receivedMessages))); } static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, + @Header(KafkaHeaders.RECEIVED_KEY) TestStats user, @Payload TestRanking ranking) { log.debug("Received message: {} -> {}", user, ranking); @@ -143,7 +143,7 @@ public class Top10ApplicationIT } synchronized void enforceAssertion( - java.util.function.Consumer> assertion) + java.util.function.Consumer> assertion) { assertion.accept(received); } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index a8fc859..559d742 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.query.TestStats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -33,7 +33,7 @@ public class Top10StreamProcessorTopologyTest public static final String OUT = "TEST-OUT"; static TopologyTestDriver testDriver; - static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); @BeforeAll @@ -51,10 +51,10 @@ public class Top10StreamProcessorTopologyTest jsonSerializer(TestWord.class, true), jsonSerializer(TestCounter.class,false)); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, new JsonDeserializer() - .copyWithType(TestUser.class) + .copyWithType(TestStats.class) .ignoreTypeHeaders(), new JsonDeserializer() .copyWithType(TestRanking.class) @@ -79,23 +79,23 @@ public class Top10StreamProcessorTopologyTest @DisplayName("Assert the expected number of messages") @Test - public void testExpectedNumberOfMessagesForUsers() + public void testExpectedNumberOfMessages() { - TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages); + TestData.assertExpectedNumberOfMessages(receivedMessages); } @DisplayName("Assert the expected final output messages") @Test - public void testExpectedLastMessagesForUSers() + public void testExpectedLastMessages() { - TestData.assertExpectedLastMessagesForUsers(receivedMessages); + TestData.assertExpectedLastMessages(receivedMessages); } @DisplayName("Assert the expected state in the state-store") @Test public void testExpectedState() { - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(store); } -- 2.20.1 From bed1a4ebf7badd1e8e8dba3c338f8ea05f6cb9e3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 23 Jun 2024 10:54:26 +0200 Subject: [PATCH 15/16] top10: 1.4.1 - Upgraded JDK-version, Spring Boot & Docker --- Dockerfile | 2 +- pom.xml | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 16a12e3..d2c5f05 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM eclipse-temurin:17-jre +FROM eclipse-temurin:21-jre-alpine COPY target/*.jar /opt/app.jar EXPOSE 8084 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 0dce2d1..de4b085 100644 --- a/pom.xml +++ b/pom.xml @@ -5,16 +5,17 @@ org.springframework.boot spring-boot-starter-parent - 3.2.5 + 3.2.7 de.juplo.kafka.wordcount top10 - 1.4.0 + 1.4.1 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example - 0.33.0 + 21 + 0.44.0 -- 2.20.1 From e4f7f1e134ae40ac8d4002a742f8426327cf177b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 23 Jun 2024 11:51:44 +0200 Subject: [PATCH 16/16] top10: 1.4.2 - RocksDB does nor work in Alpine-Linux --- Dockerfile | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index d2c5f05..ae0723d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM eclipse-temurin:21-jre-alpine +FROM eclipse-temurin:21-jre COPY target/*.jar /opt/app.jar EXPOSE 8084 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index de4b085..cb79013 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.4.1 + 1.4.2 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example -- 2.20.1