From: Kai Moritz Date: Fri, 14 Jun 2024 18:39:41 +0000 (+0200) Subject: query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON X-Git-Tag: query-2.0.0~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=700f80444d14b201f7b696fb5b7bcab0d767f007;p=demos%2Fkafka%2Fwordcount query: 2.0.0 - (RED) The keys of the top10-topic are deserialized as JSON * The seemingly straightforward change leds to a very strange and inconsisten error-situation. * Only the integration-test fails, while the topology-test works as originally expected. * The cause of the error is a missing serde-config for the key of the ``rankings``-``KStream``, which defies easy explanation. * The best explanation is, that the ``map()``-operation - despite possibly changing the type of the key and/or value - does not by itself define a parameter for specifing a corresponding serialization-config. * The reason for this is, that the operation does not define the complete operation by itself. * In order to take effect, it has to be combined with a second operation, that actually creates the outgoing topic. * Without that second DSL-operation, `map()` simply would yield no action. * And that is, why the serialization has to be defined on that second operation and cannot be defined on `map()` itself. * But the really strange thing about the error is, that it _only_ shows up in `QueryApplicationIT`. * It does not show in `QueryStreamProcessorTopologyTest` _and_ it does _not_ show up, if the application is compiled and started in the docker-setup. * One possible explanation for this wired behaviour might be a bug or misconception in the interpretation of the beforehand build topology, that leads to a non-deterministic behaviour. * Another possible explanation might be subtle differences in the internal caching behaviour -- but that seems unlikely, because tests, that are based on the `TopologyTestDriver` do not cache and are very (on the oposit) very handy if one wants to reveal bugs concerning the serialization and and running the application with the caching settings from the IT does not show the error. --- diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 6c7844d..440d5c4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -2,7 +2,6 @@ package de.juplo.kafka.wordcount.query; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -86,7 +85,7 @@ public class QueryApplicationConfiguration { Properties props = new Properties(); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index dcb1234..bf27e2d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -60,13 +60,15 @@ public class QueryStreamProcessor KTable users = builder .stream( usersInputTopic, - Consumed.with(null, new JsonSerde().copyWithType(User.class))) + Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class))) .toTable( Materialized .as(userStoreSupplier) .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(User.class))); - KStream rankings = builder.stream(rankingInputTopic); + KStream rankings = builder + .stream(rankingInputTopic) + .map((key, value) -> new KeyValue<>(key.getUsername(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( @@ -76,6 +78,7 @@ public class QueryStreamProcessor .toTable( Materialized .as(rankingStoreSupplier) + .withKeySerde(Serdes.String()) .withValueSerde(new JsonSerde().copyWithType(UserRanking.class))); Topology topology = builder.build(); diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 19ada51..1315eae 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; @@ -150,9 +151,9 @@ public class QueryApplicationIT KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) { Map properties = Map.of( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); + JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index fda7408..203c813 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -1,6 +1,7 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.StringSerializer; @@ -29,7 +30,7 @@ public class QueryStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic top10In; + TestInputTopic top10In; TestInputTopic userIn; @@ -46,13 +47,13 @@ public class QueryStreamProcessorTopologyTest top10In = testDriver.createInputTopic( TOP10_IN, - new StringSerializer(), - jsonSerializer(TestRanking.class)); + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - jsonSerializer(TestUserData.class).noTypeInfo()); + jsonSerializer(TestUserData.class, false).noTypeInfo()); } @@ -76,14 +77,15 @@ public class QueryStreamProcessorTopologyTest testDriver.close(); } - private JsonSerializer jsonSerializer(Class type) + private JsonSerializer jsonSerializer(Class type, boolean isKey) { JsonSerializer jsonSerializer = new JsonSerializer<>(); jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, + "user:" + TestUser.class.getName() + "," + "ranking:" + TestRanking.class.getName()), - false); + isKey); return jsonSerializer; } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 1fe34d9..c190eed 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import org.apache.kafka.streams.KeyValue; @@ -14,10 +15,10 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final String PETER = "peter"; - static final String KLAUS = "klaus"; + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); - static final Stream> getTop10Messages() + static final Stream> getTop10Messages() { return Stream.of(TOP10_MESSAGES); } @@ -29,8 +30,8 @@ class TestData static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername())); } private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) @@ -41,7 +42,7 @@ class TestData private static UserRanking getLastMessageFor(String user) { return getTop10Messages() - .filter(kv -> kv.key.equals(user)) + .filter(kv -> kv.key.getUsername().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); @@ -72,8 +73,7 @@ class TestData entry.setCount(testEntry.getCount()); return entry; } - - private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, @@ -136,10 +136,10 @@ class TestData private static KeyValue[] USERS_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER, - TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)), + PETER.getUsername(), + TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)), KeyValue.pair( - KLAUS, - TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)), + KLAUS.getUsername(), + TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), }; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java new file mode 100644 index 0000000..cc48496 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUser +{ + String username; +}