1 package de.juplo.kafka.wordcount.query;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import de.juplo.kafka.wordcount.top10.TestEntry;
5 import de.juplo.kafka.wordcount.top10.TestRanking;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import java.util.Arrays;
11 import java.util.stream.Stream;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static final ObjectMapper objectMapper = new ObjectMapper();
19 static final String PETER = "peter";
20 static final String KLAUS = "klaus";
22 static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
24 return Stream.of(TOP10_MESSAGES);
27 static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
29 return Stream.of(USERS_MESSAGES);
32 static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
34 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
35 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
38 private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
40 assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
43 private static UserRanking userRankingOf(String json)
47 return objectMapper.readValue(json, UserRanking.class);
51 throw new RuntimeException(e);
55 private static UserRanking getLastMessageFor(String user)
57 return getTop10Messages()
58 .filter(kv -> kv.key.equals(user))
60 .map(testRanking -> userRankingFor(user, testRanking))
61 .reduce(null, (left, right) -> right);
64 private static UserRanking userRankingFor(String user, TestRanking testRanking)
66 TestUserData testUserData = getUsersMessages()
67 .filter(kv -> kv.key.equals(user))
69 .reduce(null, (left, right) -> right);
71 Entry[] entries = Arrays
72 .stream(testRanking.getEntries())
73 .map(testEntry -> entryOf(testEntry))
74 .toArray(size -> new Entry[size]);
76 return UserRanking.of(
77 testUserData.getFirstName(),
78 testUserData.getLastName(),
82 private static Entry entryOf(TestEntry testEntry)
84 Entry entry = new Entry();
85 entry.setWord(testEntry.getWord());
86 entry.setCount(testEntry.getCount());
90 private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
95 TestEntry.of("Hallo", 1l))),
99 TestEntry.of("Müsch", 1l))),
103 TestEntry.of("Hallo", 1l),
104 TestEntry.of("Welt", 1l))),
108 TestEntry.of("Müsch", 2l))),
112 TestEntry.of("Müsch", 2l),
113 TestEntry.of("s", 1l))),
117 TestEntry.of("Hallo", 1l),
118 TestEntry.of("Welt", 1l),
119 TestEntry.of("Boäh", 1l))),
123 TestEntry.of("Welt", 2l),
124 TestEntry.of("Hallo", 1l),
125 TestEntry.of("Boäh", 1l))),
129 TestEntry.of("Welt", 2l),
130 TestEntry.of("Boäh", 2l),
131 TestEntry.of("Hallo", 1l))),
135 TestEntry.of("Müsch", 2l),
136 TestEntry.of("s", 2l))),
140 TestEntry.of("Boäh", 3l),
141 TestEntry.of("Welt", 2l),
142 TestEntry.of("Hallo", 1l))),
146 TestEntry.of("s", 3l),
147 TestEntry.of("Müsch", 2l))),
150 private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
154 TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
157 TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),