From 213321a01c3b53f86eb2dd97c3f6241e9cad9045 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 12:09:49 +0200 Subject: [PATCH 01/16] query: 1.0.6 - Added `QueryStreamProcessorTopologyTest` --- pom.xml | 15 ++ .../de/juplo/kafka/wordcount/query/Entry.java | 8 +- .../de/juplo/kafka/wordcount/query/Key.java | 6 +- .../kafka/wordcount/query/UserRanking.java | 8 +- .../QueryStreamProcessorTopologyTest.java | 76 +++++++++ .../juplo/kafka/wordcount/query/TestData.java | 159 ++++++++++++++++++ .../kafka/wordcount/top10/TestEntry.java | 15 ++ .../kafka/wordcount/top10/TestRanking.java | 20 +++ .../kafka/wordcount/users/TestUserData.java | 19 +++ 9 files changed, 313 insertions(+), 13 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/TestData.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java diff --git a/pom.xml b/pom.xml index b699a81..d74c826 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,21 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.assertj + assertj-core + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java index 4866e72..80b4daf 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -1,11 +1,11 @@ package de.juplo.kafka.wordcount.query; -import lombok.Value; +import lombok.Data; -@Value(staticConstructor = "of") +@Data public class Entry { - private final String word; - private final Long count; + private String word; + private Long count; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java index be34ba8..afeac4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -1,11 +1,9 @@ package de.juplo.kafka.wordcount.query; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; -@Getter -@Setter +@Data public class Key { private String username; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java index acffd5d..9ca765a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java @@ -1,13 +1,11 @@ package de.juplo.kafka.wordcount.query; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; +import lombok.*; -@Getter -@Setter +@Data @AllArgsConstructor(staticName = "of") +@NoArgsConstructor public class UserRanking { private String firstName; diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java new file mode 100644 index 0000000..8439be1 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -0,0 +1,76 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; + + +@Slf4j +public class QueryStreamProcessorTopologyTest +{ + public static final String TOP10_IN = "TOP10-IN"; + public static final String USERS_IN = "USERS-IN"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; + + + TopologyTestDriver testDriver; + TestInputTopic top10In; + TestInputTopic userIn; + + + @BeforeEach + public void setUp() + { + Topology topology = QueryStreamProcessor.buildTopology( + USERS_IN, + TOP10_IN, + Stores.inMemoryKeyValueStore(STORE_NAME), + new ObjectMapper()); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + top10In = testDriver.createInputTopic( + TOP10_IN, + new StringSerializer(), + new JsonSerializer()); + + userIn = testDriver.createInputTopic( + USERS_IN, + new StringSerializer(), + new JsonSerializer()); + } + + + @Test + public void test() + { + TestData + .getUsersMessages() + .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); + TestData + .getTop10Messages() + .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java new file mode 100644 index 0000000..82f7217 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -0,0 +1,159 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestEntry; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final ObjectMapper objectMapper = new ObjectMapper(); + static final String PETER = "peter"; + static final String KLAUS = "klaus"; + + static final Stream> getTop10Messages() + { + return Stream.of(TOP10_MESSAGES); + } + + static final Stream> getUsersMessages() + { + return Stream.of(USERS_MESSAGES); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + } + + private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson) + { + assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user)); + } + + private static UserRanking userRankingOf(String json) + { + try + { + return objectMapper.readValue(json, UserRanking.class); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private static UserRanking getLastMessageFor(String user) + { + return getTop10Messages() + .filter(kv -> kv.key.equals(user)) + .map(kv -> kv.value) + .map(testRanking -> userRankingFor(user, testRanking)) + .reduce(null, (left, right) -> right); + } + + private static UserRanking userRankingFor(String user, TestRanking testRanking) + { + TestUserData testUserData = getUsersMessages() + .filter(kv -> kv.key.equals(user)) + .map(kv -> kv.value) + .reduce(null, (left, right) -> right); + + Entry[] entries = Arrays + .stream(testRanking.getEntries()) + .map(testEntry -> entryOf(testEntry)) + .toArray(size -> new Entry[size]); + + return UserRanking.of( + testUserData.getFirstName(), + testUserData.getLastName(), + entries); + } + + private static Entry entryOf(TestEntry testEntry) + { + Entry entry = new Entry(); + entry.setWord(testEntry.getWord()); + entry.setCount(testEntry.getCount()); + return entry; + } + + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 1 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 2 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), + KeyValue.pair( // 3 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l))), + KeyValue.pair( // 4 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), + KeyValue.pair( // 5 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 6 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 7 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 8 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), + KeyValue.pair( // 9 + PETER, + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 10 + KLAUS, + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), + }; + + private static KeyValue[] USERS_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER, + TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)), + KeyValue.pair( + KLAUS, + TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)), + }; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java new file mode 100644 index 0000000..215327f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String word; + long count; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java new file mode 100644 index 0000000..2b49590 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data +public class TestRanking +{ + private TestEntry[] entries; + + public static TestRanking of(TestEntry... entries) + { + return new TestRanking(entries); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java b/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java new file mode 100644 index 0000000..03bf041 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.wordcount.users; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUserData +{ + public enum Sex { FEMALE, MALE, OTHER } + + String username; + String firstName; + String lastName; + Sex sex; +} -- 2.20.1 From 2b22a006cc57203406c8589687a6c729ebdbf40c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 17:03:59 +0200 Subject: [PATCH 02/16] query: 1.0.6 - Added `QueryApplicationIT` --- pom.xml | 3 + .../wordcount/query/QueryStreamProcessor.java | 7 +- .../wordcount/query/QueryApplicationIT.java | 98 +++++++++++++++++++ .../juplo/kafka/wordcount/query/TestData.java | 5 + 4 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java diff --git a/pom.xml b/pom.xml index d74c826..d7f3a30 100644 --- a/pom.xml +++ b/pom.xml @@ -70,6 +70,9 @@ + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 3e205f6..c4ae4ae 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -88,6 +88,11 @@ public class QueryStreamProcessor return topology; } + ReadOnlyKeyValueStore getStore() + { + return streams.store(storeParameters); + } + public Optional getRedirect(String username) { KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer()); @@ -108,7 +113,7 @@ public class QueryStreamProcessor { return Optional - .ofNullable(streams.store(storeParameters).get(username)) + .ofNullable(getStore().get(username)) .map(json -> { try diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java new file mode 100644 index 0000000..4e44cda --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -0,0 +1,98 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "logging.level.org.apache.kafka.clients=INFO", + "logging.level.org.apache.kafka.streams=INFO", + "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, + "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) +@Slf4j +public class QueryApplicationIT +{ + public static final String TOPIC_TOP10 = "top10"; + public static final String TOPIC_USERS = "users"; + + @Autowired + QueryStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + TestData + .getUsersMessages() + .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + TestData + .getTop10Messages() + .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + } + + private static void flush(CompletableFuture> future) + { + try + { + SendResult result = future.get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .catchUncaughtExceptions() + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + + @TestConfiguration + static class Configuration + { + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 82f7217..610bca0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -42,6 +42,11 @@ class TestData private static UserRanking userRankingOf(String json) { + if (json == null) + { + return null; + } + try { return objectMapper.readValue(json, UserRanking.class); -- 2.20.1 From cfd58858b9861e8d0e4c1d30896505a50f63255b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 19:35:22 +0200 Subject: [PATCH 03/16] query: 1.0.6 - Refactored `TestData.assertExpectedState()` --- .../juplo/kafka/wordcount/query/QueryApplicationIT.java | 2 +- .../wordcount/query/QueryStreamProcessorTopologyTest.java | 2 +- .../java/de/juplo/kafka/wordcount/query/TestData.java | 8 ++++---- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 4e44cda..914aeaf 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -82,7 +82,7 @@ public class QueryApplicationIT await("Expected state") .atMost(Duration.ofSeconds(5)) .catchUncaughtExceptions() - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); } @TestConfiguration diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 8439be1..6bdd8fa 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -65,7 +65,7 @@ public class QueryStreamProcessorTopologyTest .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); + TestData.assertExpectedState(user -> store.get(user)); } @AfterEach diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 610bca0..3fcd7c9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -5,9 +5,9 @@ import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; import de.juplo.kafka.wordcount.users.TestUserData; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Arrays; +import java.util.function.Function; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -29,10 +29,10 @@ class TestData return Stream.of(USERS_MESSAGES); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); + assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); } private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson) -- 2.20.1 From dcb383aef4f4f454e3a1aed9cf1f2a599c7b1bb9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 19:48:33 +0200 Subject: [PATCH 04/16] query: 1.0.6 - Added IT, that queries the expected state --- .../wordcount/query/QueryApplicationIT.java | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 914aeaf..e38871f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -7,19 +7,25 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; +import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( @@ -34,6 +40,7 @@ import static org.awaitility.Awaitility.await; "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) +@AutoConfigureMockMvc @EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS}) @Slf4j public class QueryApplicationIT @@ -41,6 +48,9 @@ public class QueryApplicationIT public static final String TOPIC_TOP10 = "top10"; public static final String TOPIC_USERS = "users"; + + @Autowired + MockMvc mockMvc; @Autowired QueryStreamProcessor streamProcessor; @@ -75,16 +85,42 @@ public class QueryApplicationIT } } - @DisplayName("Await the expected state in the state-store") + @DisplayName("Await, that the expected state is visible in the state-store") @Test - public void testAwaitExpectedState() + public void testAwaitExpectedStateInStore() { - await("Expected state") + await("The expected state is visible in the state-store") .atMost(Duration.ofSeconds(5)) - .catchUncaughtExceptions() .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user))); } + @DisplayName("Await, that the expected state is queryable") + @Test + public void testAwaitExpectedStateIsQueryable() + { + await("The expected state is queryable") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); + } + + private String requestUserRankingFor(String user) + { + try + { + return mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + @TestConfiguration static class Configuration { -- 2.20.1 From 57f47fca712981116b726e437f589ac727c5d0a7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 20:44:35 +0200 Subject: [PATCH 05/16] query: 2.0.0 - (RED) Formulated expectations for JSON-values --- pom.xml | 2 +- .../wordcount/query/QueryStreamProcessor.java | 2 +- .../wordcount/query/QueryApplicationIT.java | 23 ++++++++++------- .../QueryStreamProcessorTopologyTest.java | 20 ++++++++++++--- .../juplo/kafka/wordcount/query/TestData.java | 25 +++---------------- 5 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pom.xml b/pom.xml index d7f3a30..60ea716 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 1.0.6 + 2.0.0 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index c4ae4ae..ff7c150 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -88,7 +88,7 @@ public class QueryStreamProcessor return topology; } - ReadOnlyKeyValueStore getStore() + ReadOnlyKeyValueStore getStore() { return streams.store(storeParameters); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index e38871f..a9cca10 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.query; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -32,7 +33,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -52,6 +53,8 @@ public class QueryApplicationIT @Autowired MockMvc mockMvc; @Autowired + ObjectMapper objectMapper; + @Autowired QueryStreamProcessor streamProcessor; @@ -103,17 +106,19 @@ public class QueryApplicationIT .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); } - private String requestUserRankingFor(String user) + private UserRanking requestUserRankingFor(String user) { try { - return mockMvc - .perform(MockMvcRequestBuilders.get("/{user}", user) - .contentType(MediaType.APPLICATION_JSON)) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(StandardCharsets.UTF_8); + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); } catch (Exception e) { diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 6bdd8fa..eef1eec 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -15,6 +15,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.Map; + import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; @@ -45,12 +47,12 @@ public class QueryStreamProcessorTopologyTest top10In = testDriver.createInputTopic( TOP10_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestRanking.class)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestUserData.class)); } @@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } @@ -73,4 +75,16 @@ public class QueryStreamProcessorTopologyTest { testDriver.close(); } + + private JsonSerializer jsonSerializer(Class type) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "userdata:" + TestUserData.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + false); + return jsonSerializer; + } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 3fcd7c9..1fe34d9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; import de.juplo.kafka.wordcount.users.TestUserData; @@ -15,7 +14,6 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final ObjectMapper objectMapper = new ObjectMapper(); static final String PETER = "peter"; static final String KLAUS = "klaus"; @@ -29,32 +27,15 @@ class TestData return Stream.of(USERS_MESSAGES); } - static void assertExpectedState(Function function) + static void assertExpectedState(Function function) { assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); } - private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson) + private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) { - assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user)); - } - - private static UserRanking userRankingOf(String json) - { - if (json == null) - { - return null; - } - - try - { - return objectMapper.readValue(json, UserRanking.class); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); } private static UserRanking getLastMessageFor(String user) -- 2.20.1 From 278c7b8125c82120e1d80fa640bd16d375d4bf86 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 20:44:35 +0200 Subject: [PATCH 06/16] query: 2.0.0 - (GREEN) Values are serialized as JSON -- works __only__, if a default-type is defined * The default-type is needed, to deserialized values that are read from the state-store. * Without it, the deserialization fails, because not type-information is available. * The type-information gets lost, when the values are stored in the state- store, because the message-headers are _not_ stored along with the value! --- pom.xml | 9 ++- .../query/QueryApplicationConfiguration.java | 16 +++-- .../wordcount/query/QueryStreamProcessor.java | 63 +++++-------------- .../QueryStreamProcessorTopologyTest.java | 4 +- 4 files changed, 33 insertions(+), 59 deletions(-) diff --git a/pom.xml b/pom.xml index 60ea716..3edc6d3 100644 --- a/pom.xml +++ b/pom.xml @@ -29,6 +29,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot @@ -51,11 +55,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 50a5364..7da1712 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -14,6 +13,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; import java.io.IOException; import java.net.InetSocketAddress; @@ -78,7 +79,14 @@ public class QueryApplicationConfiguration Properties props = new Properties(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this! + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userdata:" + User.class.getName() + "," + + "userranking:" + UserRanking.class.getName()); return props; } @@ -89,7 +97,6 @@ public class QueryApplicationConfiguration HostInfo applicationServer, QueryApplicationProperties applicationProperties, KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -97,8 +104,7 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier, - mapper); + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index ff7c150..cc65fce 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -1,12 +1,11 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -14,6 +13,7 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; import java.net.URI; import java.util.Optional; @@ -27,8 +27,7 @@ public class QueryStreamProcessor public final KafkaStreams streams; public final HostInfo hostInfo; - public final StoreQueryParameters> storeParameters; - public final ObjectMapper mapper; + public final StoreQueryParameters> storeParameters; public QueryStreamProcessor( @@ -36,51 +35,36 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper) + KeyValueBytesStoreSupplier storeSupplier) { Topology topology = buildTopology( usersInputTopic, rankingInputTopic, - storeSupplier, - mapper); + storeSupplier); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; - this.mapper = mapper; } static Topology buildTopology( String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper) + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(usersInputTopic); - KStream rankings = builder.stream(rankingInputTopic); + KTable users = builder.table(usersInputTopic); + KStream rankings = builder.stream(rankingInputTopic); rankings - .join(users, (rankingJson, userJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - User user = mapper.readValue(userJson, User.class); - - return mapper.writeValueAsString( - UserRanking.of( - user.getFirstName(), - user.getLastName(), - ranking.getEntries())); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .toTable(Materialized.as(storeSupplier)); + .join(users, (ranking, user) -> UserRanking.of( + user.getFirstName(), + user.getLastName(), + ranking.getEntries())) + .toTable( + Materialized + .as(storeSupplier) + .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); @@ -111,20 +95,7 @@ public class QueryStreamProcessor public Optional getUserRanking(String username) { - return - Optional - .ofNullable(getStore().get(username)) - .map(json -> - { - try - { - return mapper.readValue(json, UserRanking.class); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }); + return Optional.ofNullable(getStore().get(username)); } @PostConstruct diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index eef1eec..845792c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; @@ -39,8 +38,7 @@ public class QueryStreamProcessorTopologyTest Topology topology = QueryStreamProcessor.buildTopology( USERS_IN, TOP10_IN, - Stores.inMemoryKeyValueStore(STORE_NAME), - new ObjectMapper()); + Stores.inMemoryKeyValueStore(STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); -- 2.20.1 From 2f5828ee2bbd662c3c81c76961d00b871468c8b9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Jun 2024 20:36:58 +0200 Subject: [PATCH 07/16] query: 2.0.0 - Values are serialized as JSON -- works, but is very confusing * The default-type is specified as a consumption-parameter in the command, that reads the input topic into the `KTable` via ``Consumed.with(..)``. * The resulting code is confusing, because the ``Consumed``-parameter is used for both, the consumption of the input topic _and_ the consumption of stored values, if read from the state-store. * Because of this, one might only think of the consumption of the stored values from the state-store, when looking at the ``Consumed.with()``- statement, and argue, why the type-mappings have to be specified here. --- .../query/QueryApplicationConfiguration.java | 1 - .../wordcount/query/QueryStreamProcessor.java | 15 ++++++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 7da1712..07b78e4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -80,7 +80,6 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this! props.put( JsonDeserializer.TYPE_MAPPINGS, "user:" + Key.class.getName() + "," + diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index cc65fce..0692652 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -13,9 +13,11 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; import java.net.URI; +import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -53,7 +55,18 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(usersInputTopic); + JsonSerde valueSerde = new JsonSerde(); + valueSerde.configure(Map.of( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + Key.class.getName() + "," + + "ranking:" + Ranking.class.getName() + "," + + "userdata:" + User.class.getName() + "," + + "userranking:" + UserRanking.class.getName() + ), false); + KTable users = builder.table( + usersInputTopic, + Consumed.with(null, valueSerde.copyWithType(User.class)) + ); KStream rankings = builder.stream(rankingInputTopic); rankings -- 2.20.1 From 0648885ec026d7434561060dc7edb703efea6853 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Jun 2024 20:41:30 +0200 Subject: [PATCH 08/16] query: 2.0.0 - Values are serialized as JSON -- works, still startling, but explainable * Splitting the command ``table()``, that reads the input-topic and materializes it as ``KTable()``, into the two statements ``stream()``, that reads the input-topic into a ``KStream``, and ``toTable()``, that turns the `KStream` into a ``KTable``, the ``JsonSerde``, that is specified via ``Consumed.with(..)``, is only used for the serialization and deserialization concerning the ``KTable`` -- not the deserialization of the values, that are read from the input-topic. * Hence, the type-mappings does not have to be specified for the ``JsonSerde``, resulting in better understandable code. * __Note__, that the resulting topology does not differe, because the DSL is able to combine the effects of the two statements. --- .../wordcount/query/QueryStreamProcessor.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 0692652..7dacd4b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,7 +5,6 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -13,11 +12,9 @@ import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; import java.net.URI; -import java.util.Map; import java.util.Optional; import java.util.Properties; @@ -55,18 +52,9 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - JsonSerde valueSerde = new JsonSerde(); - valueSerde.configure(Map.of( - JsonDeserializer.TYPE_MAPPINGS, - "user:" + Key.class.getName() + "," + - "ranking:" + Ranking.class.getName() + "," + - "userdata:" + User.class.getName() + "," + - "userranking:" + UserRanking.class.getName() - ), false); - KTable users = builder.table( - usersInputTopic, - Consumed.with(null, valueSerde.copyWithType(User.class)) - ); + KTable users = builder + .stream(usersInputTopic) + .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class))); KStream rankings = builder.stream(rankingInputTopic); rankings -- 2.20.1 From fc5d6c6ee08a4b2e29a045bf4071dd0a4d86bc0d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 22:46:24 +0200 Subject: [PATCH 09/16] query: 2.0.0 - Defined 2 state-stores (all state in-memory in tests) * Introduced a second state-store to store the incomming users-table. * Without the explicit definition of the state-store, it is _not_ possible, to reconfigure the integration-test in such a way, taht it does not store its state locally on disk. --- .../query/QueryApplicationConfiguration.java | 19 +++++++++++---- .../wordcount/query/QueryStreamProcessor.java | 24 ++++++++++++------- .../wordcount/query/QueryApplicationIT.java | 16 +++++++++---- .../QueryStreamProcessorTopologyTest.java | 8 ++++--- 4 files changed, 46 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 07b78e4..3bf8326 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -22,7 +22,8 @@ import java.net.Socket; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -95,7 +96,8 @@ public class QueryApplicationConfiguration Properties streamProcessorProperties, HostInfo applicationServer, QueryApplicationProperties applicationProperties, - KeyValueBytesStoreSupplier storeSupplier, + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier, ConfigurableApplicationContext context) { QueryStreamProcessor streamProcessor = new QueryStreamProcessor( @@ -103,7 +105,8 @@ public class QueryApplicationConfiguration applicationServer, applicationProperties.getUsersInputTopic(), applicationProperties.getRankingInputTopic(), - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -120,8 +123,14 @@ public class QueryApplicationConfiguration } @Bean - public KeyValueBytesStoreSupplier storeSupplier() + public KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.persistentKeyValueStore(STORE_NAME); + return Stores.persistentKeyValueStore(USER_STORE_NAME); + } + + @Bean + public KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.persistentKeyValueStore(RANKING_STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 7dacd4b..4749264 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -22,7 +22,8 @@ import java.util.Properties; @Slf4j public class QueryStreamProcessor { - public static final String STORE_NAME = "rankings-by-username"; + public static final String USER_STORE_NAME = "users"; + public static final String RANKING_STORE_NAME = "rankings"; public final KafkaStreams streams; public final HostInfo hostInfo; @@ -34,27 +35,34 @@ public class QueryStreamProcessor HostInfo applicationServer, String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier) + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) { Topology topology = buildTopology( usersInputTopic, rankingInputTopic, - storeSupplier); + userStoreSupplier, + rankingStoreSupplier); streams = new KafkaStreams(topology, props); hostInfo = applicationServer; - storeParameters = StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore());; + storeParameters = StoreQueryParameters.fromNameAndType(RANKING_STORE_NAME, QueryableStoreTypes.keyValueStore());; } static Topology buildTopology( String usersInputTopic, String rankingInputTopic, - KeyValueBytesStoreSupplier storeSupplier) + KeyValueBytesStoreSupplier userStoreSupplier, + KeyValueBytesStoreSupplier rankingStoreSupplier) { StreamsBuilder builder = new StreamsBuilder(); KTable users = builder .stream(usersInputTopic) - .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class))); + .toTable( + Materialized + .as(userStoreSupplier) + .withKeySerde(Serdes.String()) + .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder.stream(rankingInputTopic); rankings @@ -64,7 +72,7 @@ public class QueryStreamProcessor ranking.getEntries())) .toTable( Materialized - .as(storeSupplier) + .as(rankingStoreSupplier) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); @@ -80,7 +88,7 @@ public class QueryStreamProcessor public Optional getRedirect(String username) { - KeyQueryMetadata metadata = streams.queryMetadataForKey(STORE_NAME, username, Serdes.String().serializer()); + KeyQueryMetadata metadata = streams.queryMetadataForKey(RANKING_STORE_NAME, username, Serdes.String().serializer()); HostInfo activeHost = metadata.activeHost(); log.debug("Local store for {}: {}, {}:{}", username, metadata.partition(), activeHost.host(), activeHost.port()); diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index a9cca10..d800fbd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -12,7 +12,6 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; @@ -24,13 +23,15 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( properties = { + "spring.main.allow-bean-definition-overriding=true", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", @@ -129,11 +130,16 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier userStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() + { + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 845792c..1a857b7 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -24,7 +24,8 @@ public class QueryStreamProcessorTopologyTest { public static final String TOP10_IN = "TOP10-IN"; public static final String USERS_IN = "USERS-IN"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; TopologyTestDriver testDriver; @@ -38,7 +39,8 @@ public class QueryStreamProcessorTopologyTest Topology topology = QueryStreamProcessor.buildTopology( USERS_IN, TOP10_IN, - Stores.inMemoryKeyValueStore(STORE_NAME)); + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } -- 2.20.1 From f18423d411650c6f08c9b698b92c33c42bdd670f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 22:46:24 +0200 Subject: [PATCH 10/16] query: 2.0.0 - Configured caching & commit-interval in integration-test * Introduced configuration-parameters for caching and the commit-interval. * Explicitly turned of caching in the integration-test. * Explicitly set the commit-interval to a very short period (100ms) in the integration-test. --- .../wordcount/query/QueryApplicationConfiguration.java | 7 +++++++ .../kafka/wordcount/query/QueryApplicationProperties.java | 2 ++ .../de/juplo/kafka/wordcount/query/QueryApplicationIT.java | 2 ++ 3 files changed, 11 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 3bf8326..2ece744 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -70,6 +70,13 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer()); + if (applicationProperties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, applicationProperties.getCommitInterval()); + if (applicationProperties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, applicationProperties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java index df5f41e..4a9eeca 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationProperties.java @@ -17,4 +17,6 @@ public class QueryApplicationProperties private String applicationId = "query"; private String rankingInputTopic = "top10"; private String usersInputTopic = "users"; + private Integer commitInterval; + private Integer cacheMaxBytes; } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index d800fbd..58a1206 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -40,6 +40,8 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. "logging.level.org.apache.kafka.clients=INFO", "logging.level.org.apache.kafka.streams=INFO", "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) @AutoConfigureMockMvc -- 2.20.1 From 2cc9929cfdccc6974a4c66b27b939fc6c74905e4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 11 Jun 2024 22:53:11 +0200 Subject: [PATCH 11/16] query: 2.0.0 - Refined the Lombok-Annotations for the model-classes --- .../java/de/juplo/kafka/wordcount/query/Ranking.java | 10 ++-------- src/main/java/de/juplo/kafka/wordcount/query/User.java | 8 ++------ 2 files changed, 4 insertions(+), 14 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java index 69ae3aa..8966be6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Ranking.java @@ -1,15 +1,9 @@ package de.juplo.kafka.wordcount.query; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - -@Getter -@Setter +@Data public class Ranking { private Entry[] entries; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/User.java b/src/main/java/de/juplo/kafka/wordcount/query/User.java index fdc0a33..f62b475 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/User.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/User.java @@ -1,14 +1,10 @@ package de.juplo.kafka.wordcount.query; +import lombok.Data; import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -@Getter -@Setter -@ToString +@Data @EqualsAndHashCode(of = "username") public class User { -- 2.20.1 From cc8f4fcf721bcdd4129fa1c3dde6c168d90ac183 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 23:08:57 +0200 Subject: [PATCH 12/16] query: 2.0.0 - `QueryApplicationIT` uses two ``KafkaTemplate``s * Switched the setup of the `QueryApplicationIT` to use two different instances of `KafkaTemplate` for the two input-topics. * This is a preparation for the introduction of the typed JSON-keys. --- .../wordcount/query/QueryApplicationIT.java | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 58a1206..5eb4706 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,7 +1,11 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeAll; @@ -14,13 +18,16 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Map; import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; @@ -32,9 +39,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.main.allow-bean-definition-overriding=true", - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -63,21 +67,22 @@ public class QueryApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) { TestData .getUsersMessages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); TestData .getTop10Messages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); } - private static void flush(CompletableFuture> future) + private static void flush(CompletableFuture future) { try { - SendResult result = future.get(); + SendResult result = future.get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -132,6 +137,26 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { + @Bean + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + @Bean KeyValueBytesStoreSupplier userStoreSupplier() { -- 2.20.1 From a07e20011b1a22d120920e84c36683a9d42c3ac5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 23:08:57 +0200 Subject: [PATCH 13/16] query: 2.0.0 - (RED) Corrected expectations for the ``users``-input-topic * The messages that are written by the `users` service doese _not_ contain any type-information. * This commits corrects the corresponding expectations in the test-cases. * *RED:* The tests fail, because the implementation was not yet fixed! --- .../de/juplo/kafka/wordcount/query/QueryApplicationIT.java | 3 +-- .../wordcount/query/QueryStreamProcessorTopologyTest.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 5eb4706..19ada51 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; -import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -143,7 +142,7 @@ public class QueryApplicationIT Map properties = Map.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName()); + JsonSerializer.ADD_TYPE_INFO_HEADERS, false); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 1a857b7..fda7408 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -52,7 +52,7 @@ public class QueryStreamProcessorTopologyTest userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - jsonSerializer(TestUserData.class)); + jsonSerializer(TestUserData.class).noTypeInfo()); } @@ -82,7 +82,6 @@ public class QueryStreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "userdata:" + TestUserData.class.getName() + "," + "ranking:" + TestRanking.class.getName()), false); return jsonSerializer; -- 2.20.1 From 913c2c4ec0a3584f6be27c8341898d17c4260501 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 12 Jun 2024 23:08:57 +0200 Subject: [PATCH 14/16] query: 2.0.0 - (GREEN) Adjusted the implementation to the new expectations * Messages from the incomming topic, that is written by the `users` service can be serialized, although no type-information is conveyed via the headers. --- .../kafka/wordcount/query/QueryApplicationConfiguration.java | 1 - .../de/juplo/kafka/wordcount/query/QueryStreamProcessor.java | 5 ++++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 2ece744..6c7844d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -92,7 +92,6 @@ public class QueryApplicationConfiguration JsonDeserializer.TYPE_MAPPINGS, "user:" + Key.class.getName() + "," + "ranking:" + Ranking.class.getName() + "," + - "userdata:" + User.class.getName() + "," + "userranking:" + UserRanking.class.getName()); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 4749264..dcb1234 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,6 +5,7 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -57,7 +58,9 @@ public class QueryStreamProcessor StreamsBuilder builder = new StreamsBuilder(); KTable users = builder - .stream(usersInputTopic) + .stream( + usersInputTopic, + Consumed.with(null, new JsonSerde().copyWithType(User.class))) .toTable( Materialized .as(userStoreSupplier) -- 2.20.1 From 96c727206f954f313976e9e5947a56c888b5f990 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Jun 2024 20:39:41 +0200 Subject: [PATCH 15/16] query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON * The seemingly straightforward change leds to a very strange and inconsisten error-situation. * Only the integration-test fails, while the topology-test works as originally expected. * The cause of the error is a missing serde-config for the key of the ``rankings``-``KStream``, which defies easy explanation. * The best explanation is, that the ``map()``-operation - despite possibly changing the type of the key and/or value - does not by itself define a parameter for specifing a corresponding serialization-config. * The reason for this is, that the operation does not define the complete operation by itself. * In order to take effect, it has to be combined with a second operation, that actually creates the outgoing topic. * Without that second DSL-operation, `map()` simply would yield no action. * And that is, why the serialization has to be defined on that second operation and cannot be defined on `map()` itself. * But the really strange thing about the error is, that it _only_ shows up in `QueryApplicationIT`. * It does not show in `QueryStreamProcessorTopologyTest` _and_ it does _not_ show up, if the application is compiled and started in the docker-setup. * One possible explanation for this wired behaviour might be a bug or misconception in the interpretation of the beforehand build topology, that leads to a non-deterministic behaviour. * Another possible explanation might be subtle differences in the internal caching behaviour -- but that seems unlikely, because tests, that are based on the `TopologyTestDriver` do not cache and are very (on the oposit) very handy if one wants to reveal bugs concerning the serialization and and running the application with the caching settings from the IT does not show the error. --- .../de/juplo/kafka/wordcount/query/Key.java | 3 +-- .../query/QueryApplicationConfiguration.java | 3 +-- .../wordcount/query/QueryStreamProcessor.java | 7 ++++-- .../wordcount/query/QueryApplicationIT.java | 5 ++-- .../QueryStreamProcessorTopologyTest.java | 14 ++++++----- .../juplo/kafka/wordcount/query/TestData.java | 24 +++++++++---------- .../juplo/kafka/wordcount/top10/TestUser.java | 14 +++++++++++ 7 files changed, 44 insertions(+), 26 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java index afeac4a..57d095a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -6,6 +6,5 @@ import lombok.Data; @Data public class Key { - private String username; - private String word; + private String user; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 6c7844d..440d5c4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -86,7 +85,7 @@ public class QueryApplicationConfiguration { Properties props = new Properties(); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index dcb1234..0898e88 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -60,13 +60,15 @@ public class QueryStreamProcessor KTable users = builder .stream( usersInputTopic, - Consumed.with(null, new JsonSerde().copyWithType(User.class))) + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) .toTable( Materialized .as(userStoreSupplier) .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder.stream(rankingInputTopic); + KStream rankings = builder + .stream(rankingInputTopic) + .map((key, value) -> new KeyValue<>(key.getUser(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( @@ -76,6 +78,7 @@ public class QueryStreamProcessor .toTable( Materialized .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 19ada51..1315eae 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -150,9 +151,9 @@ public class QueryApplicationIT KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) { Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); + JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index fda7408..203c813 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.StringSerializer; @@ -29,7 +30,7 @@ public class QueryStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic top10In; + TestInputTopic top10In; TestInputTopic userIn; @@ -46,13 +47,13 @@ public class QueryStreamProcessorTopologyTest top10In = testDriver.createInputTopic( TOP10_IN, - new StringSerializer(), - jsonSerializer(TestRanking.class)); + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - jsonSerializer(TestUserData.class).noTypeInfo()); + jsonSerializer(TestUserData.class, false).noTypeInfo()); } @@ -76,14 +77,15 @@ public class QueryStreamProcessorTopologyTest testDriver.close(); } - private JsonSerializer jsonSerializer(Class type) + private JsonSerializer jsonSerializer(Class type, boolean isKey) { JsonSerializer jsonSerializer = new JsonSerializer<>(); jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, + "user:" + TestUser.class.getName() + "," + "ranking:" + TestRanking.class.getName()), - false); + isKey); return jsonSerializer; } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 1fe34d9..7c8b0b4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import org.apache.kafka.streams.KeyValue; @@ -14,10 +15,10 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final String PETER = "peter"; - static final String KLAUS = "klaus"; + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); - static final Stream> getTop10Messages() + static final Stream> getTop10Messages() { return Stream.of(TOP10_MESSAGES); } @@ -29,8 +30,8 @@ class TestData static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); } private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) @@ -41,7 +42,7 @@ class TestData private static UserRanking getLastMessageFor(String user) { return getTop10Messages() - .filter(kv -> kv.key.equals(user)) + .filter(kv -> kv.key.getUser().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); @@ -72,8 +73,7 @@ class TestData entry.setCount(testEntry.getCount()); return entry; } - - private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, @@ -136,10 +136,10 @@ class TestData private static KeyValue[] USERS_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER, - TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)), + PETER.getUser(), + TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)), KeyValue.pair( - KLAUS, - TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)), + KLAUS.getUser(), + TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), }; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java new file mode 100644 index 0000000..cc63c34 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUser +{ + String user; +} -- 2.20.1 From 3ba64ce6decf9343fe689bfb687a9b234cd47ebf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 15 Jun 2024 07:34:30 +0200 Subject: [PATCH 16/16] query: 2.0.0 - (GREEN) Explicitly specifed the missing serde-config --- .../juplo/kafka/wordcount/query/QueryStreamProcessor.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index 0898e88..e075eb7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,10 +5,7 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.*; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.QueryableStoreTypes; @@ -74,7 +71,8 @@ public class QueryStreamProcessor .join(users, (ranking, user) -> UserRanking.of( user.getFirstName(), user.getLastName(), - ranking.getEntries())) + ranking.getEntries()), + Joined.keySerde(Serdes.String())) .toTable( Materialized .as(rankingStoreSupplier) -- 2.20.1