* The seemingly straightforward change leds to a very strange and
inconsisten error-situation.
* Only the integration-test fails, while the topology-test works as
originally expected.
* The cause of the error is a missing serde-config for the key of the
``rankings``-``KStream``, which defies easy explanation.
* The best explanation is, that the ``map()``-operation - despite
possibly changing the type of the key and/or value - does not by itself
define a parameter for specifing a corresponding serialization-config.
* The reason for this is, that the operation does not define the complete
operation by itself.
* In order to take effect, it has to be combined with a second operation,
that actually creates the outgoing topic.
* Without that second DSL-operation, `map()` simply would yield no action.
* And that is, why the serialization has to be defined on that second
operation and cannot be defined on `map()` itself.
* But the really strange thing about the error is, that it _only_ shows up
in `QueryApplicationIT`.
* It does not show in `QueryStreamProcessorTopologyTest` _and_ it does
_not_ show up, if the application is compiled and started in the
docker-setup.
* One possible explanation for this wired behaviour might be a bug or
misconception in the interpretation of the beforehand build topology,
that leads to a non-deterministic behaviour.
* Another possible explanation might be subtle differences in the internal
caching behaviour -- but that seems unlikely, because tests, that are
based on the `TopologyTestDriver` do not cache and are very (on the oposit)
very handy if one wants to reveal bugs concerning the serialization and
and running the application with the caching settings from the IT does
not show the error.
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
{
Properties props = new Properties();
{
Properties props = new Properties();
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
KTable<String, User> users = builder
.stream(
usersInputTopic,
KTable<String, User> users = builder
.stream(
usersInputTopic,
- Consumed.with(null, new JsonSerde().copyWithType(User.class)))
+ Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
.toTable(
Materialized
.<String, User>as(userStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde().copyWithType(User.class)));
.toTable(
Materialized
.<String, User>as(userStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde().copyWithType(User.class)));
- KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+ KStream<String, Ranking> rankings = builder
+ .<Key, Ranking>stream(rankingInputTopic)
+ .map((key, value) -> new KeyValue<>(key.getUsername(), value));
rankings
.join(users, (ranking, user) -> UserRanking.of(
rankings
.join(users, (ranking, user) -> UserRanking.of(
.toTable(
Materialized
.<String, UserRanking>as(rankingStoreSupplier)
.toTable(
Materialized
.<String, UserRanking>as(rankingStoreSupplier)
+ .withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
Topology topology = builder.build();
.withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
Topology topology = builder.build();
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestRanking;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
{
Map<String, Object> properties = Map.of(
KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
{
Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+ JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
return new KafkaTemplate(producerFactory, properties);
}
return new KafkaTemplate(producerFactory, properties);
}
package de.juplo.kafka.wordcount.query;
import de.juplo.kafka.wordcount.top10.TestRanking;
package de.juplo.kafka.wordcount.query;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
TopologyTestDriver testDriver;
TopologyTestDriver testDriver;
- TestInputTopic<String, TestRanking> top10In;
+ TestInputTopic<TestUser, TestRanking> top10In;
TestInputTopic<String, TestUserData> userIn;
TestInputTopic<String, TestUserData> userIn;
top10In = testDriver.createInputTopic(
TOP10_IN,
top10In = testDriver.createInputTopic(
TOP10_IN,
- new StringSerializer(),
- jsonSerializer(TestRanking.class));
+ jsonSerializer(TestUser.class, true),
+ jsonSerializer(TestRanking.class,false));
userIn = testDriver.createInputTopic(
USERS_IN,
new StringSerializer(),
userIn = testDriver.createInputTopic(
USERS_IN,
new StringSerializer(),
- jsonSerializer(TestUserData.class).noTypeInfo());
+ jsonSerializer(TestUserData.class, false).noTypeInfo());
- private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+ private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
{
JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
{
JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
+ "user:" + TestUser.class.getName() + "," +
"ranking:" + TestRanking.class.getName()),
"ranking:" + TestRanking.class.getName()),
return jsonSerializer;
}
}
return jsonSerializer;
}
}
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
- static final String PETER = "peter";
- static final String KLAUS = "klaus";
+ static final TestUser PETER = TestUser.of("peter");
+ static final TestUser KLAUS = TestUser.of("klaus");
- static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+ static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
{
return Stream.of(TOP10_MESSAGES);
}
{
return Stream.of(TOP10_MESSAGES);
}
static void assertExpectedState(Function<String, UserRanking> function)
{
static void assertExpectedState(Function<String, UserRanking> function)
{
- assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername()));
+ assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername()));
}
private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
}
private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
private static UserRanking getLastMessageFor(String user)
{
return getTop10Messages()
private static UserRanking getLastMessageFor(String user)
{
return getTop10Messages()
- .filter(kv -> kv.key.equals(user))
+ .filter(kv -> kv.key.getUsername().equals(user))
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
entry.setCount(testEntry.getCount());
return entry;
}
entry.setCount(testEntry.getCount());
return entry;
}
-
- private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
{
KeyValue.pair( // 0
PETER,
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
{
KeyValue.pair(
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- PETER,
- TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+ PETER.getUsername(),
+ TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)),
- KLAUS,
- TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ KLAUS.getUsername(),
+ TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUser
+{
+ String username;
+}