import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
+import de.juplo.kafka.wordcount.query.TestEntry;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestStats;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
+import java.util.Arrays;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
class TestData
{
- static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+ static final String TYPE_COUNTER = "COUNTER";
+
+ static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter");
+ static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus");
+
+ static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+ {
+ return Stream.of(INPUT_MESSAGES);
+ }
+
+ private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
- TestWord.of("peter","Hallo"),
- TestCounter.of("peter","Hallo",1)),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"),
+ TestCounter.of("Hallo",1)),
new KeyValue<>(
- TestWord.of("klaus","Müsch"),
- TestCounter.of("klaus","Müsch",1)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
+ TestCounter.of("Müsch",1)),
new KeyValue<>(
- TestWord.of("peter","Welt"),
- TestCounter.of("peter","Welt",1)),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
+ TestCounter.of("Welt",1)),
new KeyValue<>(
- TestWord.of("klaus","Müsch"),
- TestCounter.of("klaus","Müsch",2)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
+ TestCounter.of("Müsch",2)),
new KeyValue<>(
- TestWord.of("klaus","s"),
- TestCounter.of("klaus","s",1)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
+ TestCounter.of("s",1)),
new KeyValue<>(
- TestWord.of("peter","Boäh"),
- TestCounter.of("peter","Boäh",1)),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
+ TestCounter.of("Boäh",1)),
new KeyValue<>(
- TestWord.of("peter","Welt"),
- TestCounter.of("peter","Welt",2)),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
+ TestCounter.of("Welt",2)),
new KeyValue<>(
- TestWord.of("peter","Boäh"),
- TestCounter.of("peter","Boäh",2)),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
+ TestCounter.of("Boäh",2)),
new KeyValue<>(
- TestWord.of("klaus","s"),
- TestCounter.of("klaus","s",2)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
+ TestCounter.of("s",2)),
new KeyValue<>(
- TestWord.of("peter","Boäh"),
- TestCounter.of("peter","Boäh",3)),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
+ TestCounter.of("Boäh",3)),
new KeyValue<>(
- TestWord.of("klaus","s"),
- TestCounter.of("klaus","s",3)),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
+ TestCounter.of("s",3)),
};
- static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
{
expectedMessages().forEach(
- (user, rankings) ->
- assertThat(receivedMessages.get(user))
+ (stats, rankings) ->
+ assertThat(receivedMessages.get(stats))
.containsExactlyElementsOf(rankings));
}
- static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ static void assertExpectedState(ReadOnlyKeyValueStore<Stats, Ranking> store)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER)));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS)));
+ }
+
+ private static Stats statsOf(TestStats stats)
+ {
+ return Stats.of(
+ StatsType.valueOf(stats.getType()),
+ stats.getChannel());
+ }
+
+ static void assertExpectedNumberOfMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
+ {
+ assertThat(countMessages(PETER, receivedMessages));
+ assertThat(countMessages(KLAUS, receivedMessages));
+ }
+
+ private static int countMessages(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
+ {
+ return messagesFor.get(stats) == null
+ ? 0
+ : messagesFor.get(stats).size();
+ }
+
+
+ static void assertExpectedLastMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking)
{
- assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
- assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+ TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
+ assertRankingEqualsRankingFromLastMessage(stats, testRanking);
}
- static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static TestEntry[] testEntriesOf(Entry... entries)
+ {
+ return Arrays
+ .stream(entries)
+ .map(entry -> TestEntry.of(
+ entry.getKey(),
+ entry.getCounter() == null
+ ? -1l
+ : entry.getCounter()))
+ .toArray(size -> new TestEntry[size]);
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking)
+ {
+ assertThat(ranking).isEqualTo(getLastMessageFor(stats));
+ }
+
+ private static TestRanking getLastMessageFor(TestStats stats)
+ {
+ return getLastMessageFor(stats, expectedMessages());
+ }
+
+ private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
+ {
+ return messagesFor
+ .get(stats)
+ .stream()
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static KeyValue<TestStats, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
- User.of("peter"),
- Ranking.of(
- Entry.of("Hallo", 1l))),
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l))),
KeyValue.pair( // 1
- User.of("klaus"),
- Ranking.of(
- Entry.of("Müsch", 1l))),
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 1l))),
KeyValue.pair( // 2
- User.of("peter"),
- Ranking.of(
- Entry.of("Hallo", 1l),
- Entry.of("Welt", 1l))),
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l))),
KeyValue.pair( // 3
- User.of("klaus"),
- Ranking.of(
- Entry.of("Müsch", 2l))),
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l))),
KeyValue.pair( // 4
- User.of("klaus"),
- Ranking.of(
- Entry.of("Müsch", 2l),
- Entry.of("s", 1l))),
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 1l))),
KeyValue.pair( // 5
- User.of("peter"),
- Ranking.of(
- Entry.of("Hallo", 1l),
- Entry.of("Welt", 1l),
- Entry.of("Boäh", 1l))),
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l),
+ TestEntry.of("Boäh", 1l))),
KeyValue.pair( // 6
- User.of("peter"),
- Ranking.of(
- Entry.of("Welt", 2l),
- Entry.of("Hallo", 1l),
- Entry.of("Boäh", 1l))),
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Boäh", 1l))),
KeyValue.pair( // 7
- User.of("peter"),
- Ranking.of(
- Entry.of("Welt", 2l),
- Entry.of("Boäh", 2l),
- Entry.of("Hallo", 1l))),
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Boäh", 2l),
+ TestEntry.of("Hallo", 1l))),
KeyValue.pair( // 8
- User.of("klaus"),
- Ranking.of(
- Entry.of("Müsch", 2l),
- Entry.of("s", 2l))),
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 2l))),
KeyValue.pair( // 9
- User.of("peter"),
- Ranking.of(
- Entry.of("Boäh", 3l),
- Entry.of("Welt", 2l),
- Entry.of("Hallo", 1l))),
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Boäh", 3l),
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l))),
KeyValue.pair( // 10
- User.of("klaus"),
- Ranking.of(
- Entry.of("s", 3l),
- Entry.of("Müsch", 2l))),
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("s", 3l),
+ TestEntry.of("Müsch", 2l))),
};
- static MultiValueMap<User, Ranking> expectedMessages()
+ private static MultiValueMap<TestStats, TestRanking> expectedMessages()
{
- MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestStats, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
return expectedMessages;
}
-
- static Map<String, Object> convertToMap(Properties properties)
- {
- return properties
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> (String)entry.getKey(),
- entry -> entry.getValue()
- ));
- }
-
- static String parseHeader(Headers headers, String key)
- {
- Header header = headers.lastHeader(key);
- if (header == null)
- {
- return key + "=null";
- }
- else
- {
- return key + "=" + new String(header.value());
- }
- }
}