package de.juplo.kafka.wordcount.top10;
-import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestRanking;
import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.util.MultiValueMap;
import java.time.Duration;
-import java.util.stream.Stream;
import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
properties = {
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+ "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
public static void testSendMessage(
@Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
{
- Stream
- .of(TestData.INPUT_MESSAGES)
+ TestData
+ .getInputMessages()
.forEach(kv ->
{
try