import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
{
Properties props = new Properties();
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
KTable<String, User> users = builder
.stream(
usersInputTopic,
- Consumed.with(null, new JsonSerde().copyWithType(User.class)))
+ Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
.toTable(
Materialized
.<String, User>as(userStoreSupplier)
.withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde().copyWithType(User.class)));
- KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+ KStream<String, Ranking> rankings = builder
+ .<Key, Ranking>stream(rankingInputTopic)
+ .map((key, value) -> new KeyValue<>(key.getUsername(), value));
rankings
.join(users, (ranking, user) -> UserRanking.of(
.toTable(
Materialized
.<String, UserRanking>as(rankingStoreSupplier)
+ .withKeySerde(Serdes.String())
.withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
Topology topology = builder.build();
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
{
Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+ JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
return new KafkaTemplate(producerFactory, properties);
}
package de.juplo.kafka.wordcount.query;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.StringSerializer;
TopologyTestDriver testDriver;
- TestInputTopic<String, TestRanking> top10In;
+ TestInputTopic<TestUser, TestRanking> top10In;
TestInputTopic<String, TestUserData> userIn;
top10In = testDriver.createInputTopic(
TOP10_IN,
- new StringSerializer(),
- jsonSerializer(TestRanking.class));
+ jsonSerializer(TestUser.class, true),
+ jsonSerializer(TestRanking.class,false));
userIn = testDriver.createInputTopic(
USERS_IN,
new StringSerializer(),
- jsonSerializer(TestUserData.class).noTypeInfo());
+ jsonSerializer(TestUserData.class, false).noTypeInfo());
}
testDriver.close();
}
- private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+ private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
{
JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
+ "user:" + TestUser.class.getName() + "," +
"ranking:" + TestRanking.class.getName()),
- false);
+ isKey);
return jsonSerializer;
}
}
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import de.juplo.kafka.wordcount.users.TestUserData;
import org.apache.kafka.streams.KeyValue;
class TestData
{
- static final String PETER = "peter";
- static final String KLAUS = "klaus";
+ static final TestUser PETER = TestUser.of("peter");
+ static final TestUser KLAUS = TestUser.of("klaus");
- static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+ static final Stream<KeyValue<TestUser, TestRanking>> getTop10Messages()
{
return Stream.of(TOP10_MESSAGES);
}
static void assertExpectedState(Function<String, UserRanking> function)
{
- assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER.getUsername(), function.apply(PETER.getUsername()));
+ assertRankingEqualsRankingFromLastMessage(KLAUS.getUsername(), function.apply(KLAUS.getUsername()));
}
private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
private static UserRanking getLastMessageFor(String user)
{
return getTop10Messages()
- .filter(kv -> kv.key.equals(user))
+ .filter(kv -> kv.key.getUsername().equals(user))
.map(kv -> kv.value)
.map(testRanking -> userRankingFor(user, testRanking))
.reduce(null, (left, right) -> right);
entry.setCount(testEntry.getCount());
return entry;
}
-
- private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ private static KeyValue<TestUser, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
{
KeyValue.pair(
- PETER,
- TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+ PETER.getUsername(),
+ TestUserData.of(PETER.getUsername(), "Peter", "Pan", TestUserData.Sex.MALE)),
KeyValue.pair(
- KLAUS,
- TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ KLAUS.getUsername(),
+ TestUserData.of(KLAUS.getUsername(), "Klaus", "Klüse", TestUserData.Sex.OTHER)),
};
}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUser
+{
+ String username;
+}