</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
- <version>1.1.3</version>
+ <version>1.2.0</version>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
- props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
props.put(
JsonSerializer.TYPE_MAPPINGS,
+ "user:" + User.class.getName() + "," +
"ranking:" + Ranking.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
+ .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+ String user;
+}
TestCounter.of("klaus","s",3)),
};
- static void assertExpectedMessages(MultiValueMap<String, Ranking> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
{
expectedMessages().forEach(
(user, rankings) ->
.containsExactlyElementsOf(rankings));
}
- static KeyValue<String, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+ static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
- "peter",
+ User.of("peter"),
Ranking.of(
Entry.of("Hallo", 1l))),
KeyValue.pair( // 1
- "klaus",
+ User.of("klaus"),
Ranking.of(
Entry.of("Müsch", 1l))),
KeyValue.pair( // 2
- "peter",
+ User.of("peter"),
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l))),
KeyValue.pair( // 3
- "klaus",
+ User.of("klaus"),
Ranking.of(
Entry.of("Müsch", 2l))),
KeyValue.pair( // 4
- "klaus",
+ User.of("klaus"),
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 1l))),
KeyValue.pair( // 5
- "peter",
+ User.of("peter"),
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 6
- "peter",
+ User.of("peter"),
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 7
- "peter",
+ User.of("peter"),
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Boäh", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 8
- "klaus",
+ User.of("klaus"),
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 2l))),
KeyValue.pair( // 9
- "peter",
+ User.of("peter"),
Ranking.of(
Entry.of("Boäh", 3l),
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 10
- "klaus",
+ User.of("klaus"),
Ranking.of(
Entry.of("s", 3l),
Entry.of("Müsch", 2l))),
};
- static MultiValueMap<String, Ranking> expectedMessages()
+ static MultiValueMap<User, Ranking> expectedMessages()
{
- MultiValueMap<String, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
TopologyTestDriver testDriver;
TestInputTopic<Key, Entry> in;
- TestOutputTopic<String, Ranking> out;
+ TestOutputTopic<User, Ranking> out;
@BeforeEach
out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<String>)keySerde.deserializer(),
+ (JsonDeserializer<User>)keySerde.deserializer(),
(JsonDeserializer<Ranking>)valueSerde.deserializer());
}
Key.of(kv.key.getUser(), kv.key.getWord()),
Entry.of(kv.value.getWord(), kv.value.getCounter())));
- MultiValueMap<String, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
.forEach(record ->