import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestEntry;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
+import java.util.Arrays;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static final User PETER = User.of("peter");
- static final User KLAUS = User.of("klaus");
+ static final TestUser PETER = TestUser.of("peter");
+ static final TestUser KLAUS = TestUser.of("klaus");
static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
TestCounter.of(KLAUS.getUser(),"s",3)),
};
- static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
{
expectedMessages().forEach(
(user, rankings) ->
static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
{
- assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
- assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
}
- static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+ static User userOf(TestUser user)
+ {
+ return User.of(user.getUser());
+ }
+
+ static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
{
assertThat(countMessagesForUser(PETER, receivedMessages));
assertThat(countMessagesForUser(KLAUS, receivedMessages));
}
- static int countMessagesForUser(User user, MultiValueMap<User, Ranking> messagesForUsers)
+ static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
{
return messagesForUsers.get(user).size();
}
- static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+ static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
{
assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
}
- static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
+ static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+ {
+ TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
+ assertRankingEqualsRankingFromLastMessage(user, testRanking);
+ }
+
+ static TestEntry[] testEntriesOf(Entry... entries)
+ {
+ return Arrays
+ .stream(entries)
+ .map(entry -> TestEntry.of(
+ entry.getWord(),
+ entry.getCounter() == null
+ ? -1l
+ : entry.getCounter()))
+ .toArray(size -> new TestEntry[size]);
+ }
+
+ static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
{
assertThat(ranking).isEqualTo(getLastMessageFor(user));
}
- static Ranking getLastMessageFor(User user)
+ static TestRanking getLastMessageFor(TestUser user)
{
return getLastMessageFor(user, expectedMessages());
}
- static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
+ static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
{
return messagesForUsers
.get(user)
.reduce(null, (left, right) -> right);
}
- static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+ static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
- Ranking.of(
- Entry.of("Hallo", 1l))),
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l))),
KeyValue.pair( // 1
KLAUS,
- Ranking.of(
- Entry.of("Müsch", 1l))),
+ TestRanking.of(
+ TestEntry.of("Müsch", 1l))),
KeyValue.pair( // 2
PETER,
- Ranking.of(
- Entry.of("Hallo", 1l),
- Entry.of("Welt", 1l))),
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l))),
KeyValue.pair( // 3
KLAUS,
- Ranking.of(
- Entry.of("Müsch", 2l))),
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l))),
KeyValue.pair( // 4
KLAUS,
- Ranking.of(
- Entry.of("Müsch", 2l),
- Entry.of("s", 1l))),
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 1l))),
KeyValue.pair( // 5
PETER,
- Ranking.of(
- Entry.of("Hallo", 1l),
- Entry.of("Welt", 1l),
- Entry.of("Boäh", 1l))),
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l),
+ TestEntry.of("Boäh", 1l))),
KeyValue.pair( // 6
PETER,
- Ranking.of(
- Entry.of("Welt", 2l),
- Entry.of("Hallo", 1l),
- Entry.of("Boäh", 1l))),
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Boäh", 1l))),
KeyValue.pair( // 7
PETER,
- Ranking.of(
- Entry.of("Welt", 2l),
- Entry.of("Boäh", 2l),
- Entry.of("Hallo", 1l))),
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Boäh", 2l),
+ TestEntry.of("Hallo", 1l))),
KeyValue.pair( // 8
KLAUS,
- Ranking.of(
- Entry.of("Müsch", 2l),
- Entry.of("s", 2l))),
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 2l))),
KeyValue.pair( // 9
PETER,
- Ranking.of(
- Entry.of("Boäh", 3l),
- Entry.of("Welt", 2l),
- Entry.of("Hallo", 1l))),
+ TestRanking.of(
+ TestEntry.of("Boäh", 3l),
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l))),
KeyValue.pair( // 10
KLAUS,
- Ranking.of(
- Entry.of("s", 3l),
- Entry.of("Müsch", 2l))),
+ TestRanking.of(
+ TestEntry.of("s", 3l),
+ TestEntry.of("Müsch", 2l))),
};
- static MultiValueMap<User, Ranking> expectedMessages()
+ static MultiValueMap<TestUser, TestRanking> expectedMessages()
{
- MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
TopologyTestDriver testDriver;
- TestInputTopic<Key, Entry> in;
- TestOutputTopic<User, Ranking> out;
+ TestInputTopic<TestWord, TestCounter> in;
+ TestOutputTopic<TestUser, TestRanking> out;
@BeforeEach
in = testDriver.createInputTopic(
IN,
- jsonSerializer(Key.class, true),
- jsonSerializer(Entry.class,false));
+ jsonSerializer(TestWord.class, true),
+ jsonSerializer(TestCounter.class,false));
out = testDriver.createOutputTopic(
OUT,
new JsonDeserializer()
- .copyWithType(User.class)
+ .copyWithType(TestUser.class)
.ignoreTypeHeaders(),
new JsonDeserializer()
- .copyWithType(Ranking.class)
+ .copyWithType(TestRanking.class)
.ignoreTypeHeaders());
}
{
Stream
.of(TestData.INPUT_MESSAGES)
- .forEach(kv -> in.pipeInput(
- Key.of(kv.key.getUser(), kv.key.getWord()),
- Entry.of(kv.value.getWord(), kv.value.getCounter())));
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
- MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
out
.readRecordsToList()
.forEach(record -> receivedMessages.add(record.key(), record.value()));