1 package de.juplo.kafka.wordcount.query;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import de.juplo.kafka.wordcount.top10.TestEntry;
5 import de.juplo.kafka.wordcount.top10.TestRanking;
6 import de.juplo.kafka.wordcount.users.TestUserData;
7 import org.apache.kafka.streams.KeyValue;
8 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
10 import java.util.Arrays;
11 import java.util.stream.Stream;
13 import static org.assertj.core.api.Assertions.assertThat;
18 static final ObjectMapper objectMapper = new ObjectMapper();
19 static final String PETER = "peter";
20 static final String KLAUS = "klaus";
22 static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
24 return Stream.of(TOP10_MESSAGES);
27 static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
29 return Stream.of(USERS_MESSAGES);
32 static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
34 assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
35 assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
38 private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
40 assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
43 private static UserRanking userRankingOf(String json)
52 return objectMapper.readValue(json, UserRanking.class);
56 throw new RuntimeException(e);
60 private static UserRanking getLastMessageFor(String user)
62 return getTop10Messages()
63 .filter(kv -> kv.key.equals(user))
65 .map(testRanking -> userRankingFor(user, testRanking))
66 .reduce(null, (left, right) -> right);
69 private static UserRanking userRankingFor(String user, TestRanking testRanking)
71 TestUserData testUserData = getUsersMessages()
72 .filter(kv -> kv.key.equals(user))
74 .reduce(null, (left, right) -> right);
76 Entry[] entries = Arrays
77 .stream(testRanking.getEntries())
78 .map(testEntry -> entryOf(testEntry))
79 .toArray(size -> new Entry[size]);
81 return UserRanking.of(
82 testUserData.getFirstName(),
83 testUserData.getLastName(),
87 private static Entry entryOf(TestEntry testEntry)
89 Entry entry = new Entry();
90 entry.setWord(testEntry.getWord());
91 entry.setCount(testEntry.getCount());
95 private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
100 TestEntry.of("Hallo", 1l))),
104 TestEntry.of("Müsch", 1l))),
108 TestEntry.of("Hallo", 1l),
109 TestEntry.of("Welt", 1l))),
113 TestEntry.of("Müsch", 2l))),
117 TestEntry.of("Müsch", 2l),
118 TestEntry.of("s", 1l))),
122 TestEntry.of("Hallo", 1l),
123 TestEntry.of("Welt", 1l),
124 TestEntry.of("Boäh", 1l))),
128 TestEntry.of("Welt", 2l),
129 TestEntry.of("Hallo", 1l),
130 TestEntry.of("Boäh", 1l))),
134 TestEntry.of("Welt", 2l),
135 TestEntry.of("Boäh", 2l),
136 TestEntry.of("Hallo", 1l))),
140 TestEntry.of("Müsch", 2l),
141 TestEntry.of("s", 2l))),
145 TestEntry.of("Boäh", 3l),
146 TestEntry.of("Welt", 2l),
147 TestEntry.of("Hallo", 1l))),
151 TestEntry.of("s", 3l),
152 TestEntry.of("Müsch", 2l))),
155 private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
159 TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
162 TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),