import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
class TestData
{
+ static final User PETER = User.of("peter");
+ static final User KLAUS = User.of("klaus");
+
static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
- TestWord.of("peter","Hallo"),
- TestCounter.of("peter","Hallo",1)),
+ TestWord.of(PETER.getUser(),"Hallo"),
+ TestCounter.of(PETER.getUser(),"Hallo",1)),
new KeyValue<>(
- TestWord.of("klaus","Müsch"),
- TestCounter.of("klaus","Müsch",1)),
+ TestWord.of(KLAUS.getUser(),"Müsch"),
+ TestCounter.of(KLAUS.getUser(),"Müsch",1)),
new KeyValue<>(
- TestWord.of("peter","Welt"),
- TestCounter.of("peter","Welt",1)),
+ TestWord.of(PETER.getUser(),"Welt"),
+ TestCounter.of(PETER.getUser(),"Welt",1)),
new KeyValue<>(
- TestWord.of("klaus","Müsch"),
- TestCounter.of("klaus","Müsch",2)),
+ TestWord.of(KLAUS.getUser(),"Müsch"),
+ TestCounter.of(KLAUS.getUser(),"Müsch",2)),
new KeyValue<>(
- TestWord.of("klaus","s"),
- TestCounter.of("klaus","s",1)),
+ TestWord.of(KLAUS.getUser(),"s"),
+ TestCounter.of(KLAUS.getUser(),"s",1)),
new KeyValue<>(
- TestWord.of("peter","Boäh"),
- TestCounter.of("peter","Boäh",1)),
+ TestWord.of(PETER.getUser(),"Boäh"),
+ TestCounter.of(PETER.getUser(),"Boäh",1)),
new KeyValue<>(
- TestWord.of("peter","Welt"),
- TestCounter.of("peter","Welt",2)),
+ TestWord.of(PETER.getUser(),"Welt"),
+ TestCounter.of(PETER.getUser(),"Welt",2)),
new KeyValue<>(
- TestWord.of("peter","Boäh"),
- TestCounter.of("peter","Boäh",2)),
+ TestWord.of(PETER.getUser(),"Boäh"),
+ TestCounter.of(PETER.getUser(),"Boäh",2)),
new KeyValue<>(
- TestWord.of("klaus","s"),
- TestCounter.of("klaus","s",2)),
+ TestWord.of(KLAUS.getUser(),"s"),
+ TestCounter.of(KLAUS.getUser(),"s",2)),
new KeyValue<>(
- TestWord.of("peter","Boäh"),
- TestCounter.of("peter","Boäh",3)),
+ TestWord.of(PETER.getUser(),"Boäh"),
+ TestCounter.of(PETER.getUser(),"Boäh",3)),
new KeyValue<>(
- TestWord.of("klaus","s"),
- TestCounter.of("klaus","s",3)),
+ TestWord.of(KLAUS.getUser(),"s"),
+ TestCounter.of(KLAUS.getUser(),"s",3)),
};
- static void assertExpectedMessages(MultiValueMap<String, Ranking> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
{
expectedMessages().forEach(
(user, rankings) ->
.containsExactlyElementsOf(rankings));
}
- static KeyValue<String, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+ static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ }
+
+ static void assertExpectedLastMessagesForUsers(MultiValueMap<User, Ranking> receivedMessages)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+ }
+
+ static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking)
+ {
+ assertThat(ranking).isEqualTo(getLastMessageFor(user));
+ }
+
+ static Ranking getLastMessageFor(User user)
+ {
+ return getLastMessageFor(user, expectedMessages());
+ }
+
+ static Ranking getLastMessageFor(User user, MultiValueMap<User, Ranking> messagesForUsers)
+ {
+ return messagesForUsers
+ .get(user)
+ .stream()
+ .reduce(null, (left, right) -> right);
+ }
+
+ static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
- "peter",
+ PETER,
Ranking.of(
Entry.of("Hallo", 1l))),
KeyValue.pair( // 1
- "klaus",
+ KLAUS,
Ranking.of(
Entry.of("Müsch", 1l))),
KeyValue.pair( // 2
- "peter",
+ PETER,
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l))),
KeyValue.pair( // 3
- "klaus",
+ KLAUS,
Ranking.of(
Entry.of("Müsch", 2l))),
KeyValue.pair( // 4
- "klaus",
+ KLAUS,
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 1l))),
KeyValue.pair( // 5
- "peter",
+ PETER,
Ranking.of(
Entry.of("Hallo", 1l),
Entry.of("Welt", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 6
- "peter",
+ PETER,
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l),
Entry.of("Boäh", 1l))),
KeyValue.pair( // 7
- "peter",
+ PETER,
Ranking.of(
Entry.of("Welt", 2l),
Entry.of("Boäh", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 8
- "klaus",
+ KLAUS,
Ranking.of(
Entry.of("Müsch", 2l),
Entry.of("s", 2l))),
KeyValue.pair( // 9
- "peter",
+ PETER,
Ranking.of(
Entry.of("Boäh", 3l),
Entry.of("Welt", 2l),
Entry.of("Hallo", 1l))),
KeyValue.pair( // 10
- "klaus",
+ KLAUS,
Ranking.of(
Entry.of("s", 3l),
Entry.of("Müsch", 2l))),
};
- static MultiValueMap<String, Ranking> expectedMessages()
+ static MultiValueMap<User, Ranking> expectedMessages()
{
- MultiValueMap<String, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));