projects
/
demos
/
kafka
/
wordcount
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
1293103
)
top10: 1.2.0 - Switched the message-key from `String` to `User`
top10-1.2.0
author
Kai Moritz
<kai@juplo.de>
Tue, 28 May 2024 20:59:38 +0000
(22:59 +0200)
committer
Kai Moritz
<kai@juplo.de>
Thu, 30 May 2024 10:12:39 +0000
(12:12 +0200)
pom.xml
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/wordcount/top10/User.java
[new file with mode: 0644]
patch
|
blob
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
patch
|
blob
|
history
diff --git
a/pom.xml
b/pom.xml
index
a8abbc8
..
fd71ccd
100644
(file)
--- a/
pom.xml
+++ b/
pom.xml
@@
-10,7
+10,7
@@
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
- <version>1.
1.3
</version>
+ <version>1.
2.0
</version>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
diff --git
a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
index
7749917
..
6f18339
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
@@
-33,7
+33,7
@@
public class Top10ApplicationConfiguration
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
- props.put(JsonDeserializer.KEY_DEFAULT_TYPE,
String
.class.getName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE,
User
.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
props.put(
JsonDeserializer.TYPE_MAPPINGS,
@@
-42,6
+42,7
@@
public class Top10ApplicationConfiguration
props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
props.put(
JsonSerializer.TYPE_MAPPINGS,
props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
props.put(
JsonSerializer.TYPE_MAPPINGS,
+ "user:" + User.class.getName() + "," +
"ranking:" + Ranking.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
"ranking:" + Ranking.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
diff --git
a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
index
a3900bf
..
d3846d8
100644
(file)
--- a/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
@@
-35,7
+35,7
@@
public class Top10StreamProcessor
builder
.<Key, Entry>stream(inputTopic)
builder
.<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(
key.getUser(
), entry))
+ .map((key, entry) -> new KeyValue<>(
User.of(key.getUser()
), entry))
.groupByKey()
.aggregate(
() -> new Ranking(),
.groupByKey()
.aggregate(
() -> new Ranking(),
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java
b/src/main/java/de/juplo/kafka/wordcount/top10/User.java
new file mode 100644
(file)
index 0000000..
53c258d
--- /dev/null
+++ b/
src/main/java/de/juplo/kafka/wordcount/top10/User.java
@@ -0,0
+1,14
@@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+ String user;
+}
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
index
73a405e
..
3bb6b54
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java
@@
-55,7
+55,7
@@
class TestData
TestCounter.of("klaus","s",3)),
};
TestCounter.of("klaus","s",3)),
};
- static void assertExpectedMessages(MultiValueMap<
String
, Ranking> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<
User
, Ranking> receivedMessages)
{
expectedMessages().forEach(
(user, rankings) ->
{
expectedMessages().forEach(
(user, rankings) ->
@@
-63,69
+63,69
@@
class TestData
.containsExactlyElementsOf(rankings));
}
.containsExactlyElementsOf(rankings));
}
- static KeyValue<
String
, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+ static KeyValue<
User
, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
{
KeyValue.pair( // 0
-
"peter"
,
+
User.of("peter")
,
Ranking.of(
Entry.of("Hallo", 1l))),
KeyValue.pair( // 1
Ranking.of(
Entry.of("Hallo", 1l))),
KeyValue.pair( // 1
-
"klaus"
,
+
User.of("klaus")
,
Ranking.of(
Entry.of("Müsch", 1l))),
KeyValue.pair( // 2
Ranking.of(
Entry.of("Müsch", 1l))),
KeyValue.pair( // 2
-
"peter"
,
+
User.of("peter")
,
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l))),
KeyValue.pair( // 3
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l))),
KeyValue.pair( // 3
-
"klaus"
,
+
User.of("klaus")
,
Ranking.of(
Entry.of("Müsch", 2l))),
KeyValue.pair( // 4
Ranking.of(
Entry.of("Müsch", 2l))),
KeyValue.pair( // 4
-
"klaus"
,
+
User.of("klaus")
,
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 1l))),
KeyValue.pair( // 5
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 1l))),
KeyValue.pair( // 5
-
"peter"
,
+
User.of("peter")
,
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 6
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 6
-
"peter"
,
+
User.of("peter")
,
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 7
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 7
-
"peter"
,
+
User.of("peter")
,
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Boäh", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 8
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Boäh", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 8
-
"klaus"
,
+
User.of("klaus")
,
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 2l))),
KeyValue.pair( // 9
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 2l))),
KeyValue.pair( // 9
-
"peter"
,
+
User.of("peter")
,
Ranking.of(
Entry.of("Boäh", 3l),
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 10
Ranking.of(
Entry.of("Boäh", 3l),
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 10
-
"klaus"
,
+
User.of("klaus")
,
Ranking.of(
Entry.of("s", 3l),
Entry.of("Müsch", 2l))),
};
Ranking.of(
Entry.of("s", 3l),
Entry.of("Müsch", 2l))),
};
- static MultiValueMap<
String
, Ranking> expectedMessages()
+ static MultiValueMap<
User
, Ranking> expectedMessages()
{
{
- MultiValueMap<
String
, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<
User
, Ranking> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
index
86314e5
..
01c1cf6
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
@@
-33,7
+33,7
@@
public class Top10StreamProcessorTopologyTest
TopologyTestDriver testDriver;
TestInputTopic<Key, Entry> in;
TopologyTestDriver testDriver;
TestInputTopic<Key, Entry> in;
- TestOutputTopic<
String
, Ranking> out;
+ TestOutputTopic<
User
, Ranking> out;
@BeforeEach
@BeforeEach
@@
-61,7
+61,7
@@
public class Top10StreamProcessorTopologyTest
out = testDriver.createOutputTopic(
OUT,
out = testDriver.createOutputTopic(
OUT,
- (JsonDeserializer<
String
>)keySerde.deserializer(),
+ (JsonDeserializer<
User
>)keySerde.deserializer(),
(JsonDeserializer<Ranking>)valueSerde.deserializer());
}
(JsonDeserializer<Ranking>)valueSerde.deserializer());
}
@@
-76,7
+76,7
@@
public class Top10StreamProcessorTopologyTest
Key.of(kv.key.getUser(), kv.key.getWord()),
Entry.of(kv.value.getWord(), kv.value.getCounter())));
Key.of(kv.key.getUser(), kv.key.getWord()),
Entry.of(kv.value.getWord(), kv.value.getCounter())));
- MultiValueMap<
String
, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<
User
, Ranking> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
.forEach(record ->
out
.readRecordsToList()
.forEach(record ->