projects
/
demos
/
kafka
/
wordcount
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
top10: 1.2.1 - `TestData` uses faked foreign classes for input-/output data
[demos/kafka/wordcount]
/
src
/
test
/
java
/
de
/
juplo
/
kafka
/
wordcount
/
top10
/
Top10StreamProcessorTopologyTest.java
diff --git
a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
index
80fc0df
..
cd09c06
100644
(file)
--- a/
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
+++ b/
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java
@@
-2,6
+2,8
@@
package de.juplo.kafka.wordcount.top10;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
@@
-32,8
+34,8
@@
public class Top10StreamProcessorTopologyTest
TopologyTestDriver testDriver;
TopologyTestDriver testDriver;
- TestInputTopic<
Key, Entry
> in;
- TestOutputTopic<
User,
Ranking> out;
+ TestInputTopic<
TestWord, TestCounter
> in;
+ TestOutputTopic<
TestUser, Test
Ranking> out;
@BeforeEach
@BeforeEach
@@
-48,16
+50,16
@@
public class Top10StreamProcessorTopologyTest
in = testDriver.createInputTopic(
IN,
in = testDriver.createInputTopic(
IN,
- jsonSerializer(
Key
.class, true),
- jsonSerializer(
Entry
.class,false));
+ jsonSerializer(
TestWord
.class, true),
+ jsonSerializer(
TestCounter
.class,false));
out = testDriver.createOutputTopic(
OUT,
new JsonDeserializer()
out = testDriver.createOutputTopic(
OUT,
new JsonDeserializer()
- .copyWithType(User.class)
+ .copyWithType(
Test
User.class)
.ignoreTypeHeaders(),
new JsonDeserializer()
.ignoreTypeHeaders(),
new JsonDeserializer()
- .copyWithType(Ranking.class)
+ .copyWithType(
Test
Ranking.class)
.ignoreTypeHeaders());
}
.ignoreTypeHeaders());
}
@@
-68,11
+70,9
@@
public class Top10StreamProcessorTopologyTest
{
Stream
.of(TestData.INPUT_MESSAGES)
{
Stream
.of(TestData.INPUT_MESSAGES)
- .forEach(kv -> in.pipeInput(
- Key.of(kv.key.getUser(), kv.key.getWord()),
- Entry.of(kv.value.getWord(), kv.value.getCounter())));
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
- MultiValueMap<
User,
Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<
TestUser, Test
Ranking> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
.forEach(record -> receivedMessages.add(record.key(), record.value()));
out
.readRecordsToList()
.forEach(record -> receivedMessages.add(record.key(), record.value()));