import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestEntry;
import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.util.LinkedMultiValueMap;
{
static final String TYPE_COUNTER = "COUNTER";
- static final TestUser PETER = TestUser.of("peter");
- static final TestUser KLAUS = TestUser.of("klaus");
+ static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter");
+ static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus");
static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
{
private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
{
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"),
TestCounter.of("Hallo",1)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
TestCounter.of("Müsch",1)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
TestCounter.of("Welt",1)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"),
TestCounter.of("Müsch",2)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
TestCounter.of("s",1)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
TestCounter.of("Boäh",1)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"),
TestCounter.of("Welt",2)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
TestCounter.of("Boäh",2)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
TestCounter.of("s",2)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"),
+ TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"),
TestCounter.of("Boäh",3)),
new KeyValue<>(
- TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"),
+ TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"),
TestCounter.of("s",3)),
};
- static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
+ static void assertExpectedMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
{
expectedMessages().forEach(
- (user, rankings) ->
- assertThat(receivedMessages.get(user))
+ (stats, rankings) ->
+ assertThat(receivedMessages.get(stats))
.containsExactlyElementsOf(rankings));
}
- static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ static void assertExpectedState(ReadOnlyKeyValueStore<Stats, Ranking> store)
{
- assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
- assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER)));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS)));
}
- private static User userOf(TestUser user)
+ private static Stats statsOf(TestStats stats)
{
- return User.of(user.getUser());
+ return Stats.of(
+ StatsType.valueOf(stats.getType()),
+ stats.getChannel());
}
- static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+ static void assertExpectedNumberOfMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
{
- assertThat(countMessagesForUser(PETER, receivedMessages));
- assertThat(countMessagesForUser(KLAUS, receivedMessages));
+ assertThat(countMessages(PETER, receivedMessages));
+ assertThat(countMessages(KLAUS, receivedMessages));
}
- private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+ private static int countMessages(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
{
- return messagesForUsers.get(user) == null
+ return messagesFor.get(stats) == null
? 0
- : messagesForUsers.get(user).size();
+ : messagesFor.get(stats).size();
}
- static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+ static void assertExpectedLastMessages(MultiValueMap<TestStats, TestRanking> receivedMessages)
{
assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
}
- private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+ private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking)
{
TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
- assertRankingEqualsRankingFromLastMessage(user, testRanking);
+ assertRankingEqualsRankingFromLastMessage(stats, testRanking);
}
private static TestEntry[] testEntriesOf(Entry... entries)
.toArray(size -> new TestEntry[size]);
}
- private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
+ private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking)
{
- assertThat(ranking).isEqualTo(getLastMessageFor(user));
+ assertThat(ranking).isEqualTo(getLastMessageFor(stats));
}
- private static TestRanking getLastMessageFor(TestUser user)
+ private static TestRanking getLastMessageFor(TestStats stats)
{
- return getLastMessageFor(user, expectedMessages());
+ return getLastMessageFor(stats, expectedMessages());
}
- private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+ private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap<TestStats, TestRanking> messagesFor)
{
- return messagesForUsers
- .get(user)
+ return messagesFor
+ .get(stats)
.stream()
.reduce(null, (left, right) -> right);
}
- private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
+ private static KeyValue<TestStats, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
{
KeyValue.pair( // 0
PETER,
TestEntry.of("Müsch", 2l))),
};
- private static MultiValueMap<TestUser, TestRanking> expectedMessages()
+ private static MultiValueMap<TestStats, TestRanking> expectedMessages()
{
- MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
+ MultiValueMap<TestStats, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
Stream
.of(EXPECTED_MESSAGES)
.forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=stats:de.juplo.kafka.wordcount.query.TestStats,ranking:de.juplo.kafka.wordcount.query.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
@DisplayName("Await the expected number of messages")
@Test
- public void testAwaitExpectedNumberOfMessagesForUsers()
+ public void testAwaitExpectedNumberOfMessages()
{
await("Expected number of messages")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
+ receivedMessages -> TestData.assertExpectedNumberOfMessages(receivedMessages)));
}
@DisplayName("Await the expected final output messages")
@Test
- public void testAwaitExpectedLastMessagesForUsers()
+ public void testAwaitExpectedLastMessages()
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
+ receivedMessages -> TestData.assertExpectedLastMessages(receivedMessages)));
}
static class Consumer
{
- private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<TestStats, TestRanking> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
+ @Header(KafkaHeaders.RECEIVED_KEY) TestStats user,
@Payload TestRanking ranking)
{
log.debug("Received message: {} -> {}", user, ranking);
}
synchronized void enforceAssertion(
- java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
+ java.util.function.Consumer<MultiValueMap<TestStats, TestRanking>> assertion)
{
assertion.accept(received);
}
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
public static final String OUT = "TEST-OUT";
static TopologyTestDriver testDriver;
- static MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
+ static MultiValueMap<TestStats, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
@BeforeAll
jsonSerializer(TestWord.class, true),
jsonSerializer(TestCounter.class,false));
- TestOutputTopic<TestUser, TestRanking> out = testDriver.createOutputTopic(
+ TestOutputTopic<TestStats, TestRanking> out = testDriver.createOutputTopic(
OUT,
new JsonDeserializer()
- .copyWithType(TestUser.class)
+ .copyWithType(TestStats.class)
.ignoreTypeHeaders(),
new JsonDeserializer()
.copyWithType(TestRanking.class)
@DisplayName("Assert the expected number of messages")
@Test
- public void testExpectedNumberOfMessagesForUsers()
+ public void testExpectedNumberOfMessages()
{
- TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
+ TestData.assertExpectedNumberOfMessages(receivedMessages);
}
@DisplayName("Assert the expected final output messages")
@Test
- public void testExpectedLastMessagesForUSers()
+ public void testExpectedLastMessages()
{
- TestData.assertExpectedLastMessagesForUsers(receivedMessages);
+ TestData.assertExpectedLastMessages(receivedMessages);
}
@DisplayName("Assert the expected state in the state-store")
@Test
public void testExpectedState()
{
- KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+ KeyValueStore<Stats, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
TestData.assertExpectedState(store);
}