--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+ "logging.level.root=WARN",
+ "logging.level.de.juplo=DEBUG",
+ "logging.level.org.apache.kafka.clients=INFO",
+ "logging.level.org.apache.kafka.streams=INFO",
+ "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
+ "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
+@Slf4j
+public class QueryApplicationIT
+{
+ public static final String TOPIC_TOP10 = "top10";
+ public static final String TOPIC_USERS = "users";
+
+ @Autowired
+ QueryStreamProcessor streamProcessor;
+
+
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<String, Object> kafkaTemplate)
+ {
+ TestData
+ .getUsersMessages()
+ .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+ TestData
+ .getTop10Messages()
+ .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+ }
+
+ private static void flush(CompletableFuture<SendResult<String, Object>> future)
+ {
+ try
+ {
+ SendResult<String, Object> result = future.get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .catchUncaughtExceptions()
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
+ }
+}