+ static void assertExpectedState(ReadOnlyKeyValueStore<Stats, Ranking> store)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER)));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS)));
+ }
+
+ private static Stats statsOf(TestStats stats)
+ {
+ return Stats.of(
+ StatsType.valueOf(stats.getType()),
+ stats.getChannel());
+ }
+
+ static void assertExpectedNumberOfMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
+ {
+ assertThat(countMessages(PETER, receivedMessages));
+ assertThat(countMessages(KLAUS, receivedMessages));
+ }
+
+ private static int countMessages(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
+ {
+ return messagesFor.get(stats) == null
+ ? 0
+ : messagesFor.get(stats).size();
+ }
+
+
+ static void assertExpectedLastMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking)
+ {
+ TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
+ assertRankingEqualsRankingFromLastMessage(stats, testRanking);
+ }
+
+ private static TestEntry[] testEntriesOf(Entry... entries)
+ {
+ return Arrays
+ .stream(entries)
+ .map(entry -> TestEntry.of(
+ entry.getKey(),
+ entry.getCounter() == null
+ ? -1l
+ : entry.getCounter()))
+ .toArray(size -> new TestEntry[size]);
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking)
+ {
+ assertThat(ranking).isEqualTo(getLastMessageFor(stats));
+ }
+
+ private static TestRanking getLastMessageFor(TestStats stats)
+ {
+ return getLastMessageFor(stats, expectedMessages());
+ }
+
+ private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
+ {
+ return messagesFor
+ .get(stats)
+ .stream()
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static KeyValue<TestStats, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]