static final TestUser PETER = TestUser.of("peter");
static final TestUser KLAUS = TestUser.of("klaus");
- static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+ static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+ {
+ return Stream.of(INPUT_MESSAGES);
+ }
+
+ private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
TestWord.of(PETER.getUser(),"Hallo"),
assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
}
- static User userOf(TestUser user)
+ private static User userOf(TestUser user)
{
return User.of(user.getUser());
}
assertThat(countMessagesForUser(KLAUS, receivedMessages));
}
- static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+ private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
{
return messagesForUsers.get(user) == null
? 0
assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
}
- static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+ private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
{
TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
assertRankingEqualsRankingFromLastMessage(user, testRanking);
}
- static TestEntry[] testEntriesOf(Entry... entries)
+ private static TestEntry[] testEntriesOf(Entry... entries)
{
return Arrays
.stream(entries)
.toArray(size -> new TestEntry[size]);
}
- static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
+ private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
{
assertThat(ranking).isEqualTo(getLastMessageFor(user));
}
- static TestRanking getLastMessageFor(TestUser user)
+ private static TestRanking getLastMessageFor(TestUser user)
{
return getLastMessageFor(user, expectedMessages());
}
- static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+ private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
{
return messagesForUsers
.get(user)
.reduce(null, (left, right) -> right);
}
- static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
TestEntry.of("Müsch", 2l))),
};
- static MultiValueMap<TestUser, TestRanking> expectedMessages()
+ private static MultiValueMap<TestUser, TestRanking> expectedMessages()
{
MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
Stream
package de.juplo.kafka.wordcount.top10;
-import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestRanking;
import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.util.MultiValueMap;
import java.time.Duration;
-import java.util.stream.Stream;
import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
public static void testSendMessage(
@Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
{
- Stream
- .of(TestData.INPUT_MESSAGES)
+ TestData
+ .getInputMessages()
.forEach(kv ->
{
try