query: 1.0.6 - Added `QueryApplicationIT`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / QueryApplicationIT.java
1 package de.juplo.kafka.wordcount.query;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
5 import org.apache.kafka.streams.state.Stores;
6 import org.junit.jupiter.api.BeforeAll;
7 import org.junit.jupiter.api.DisplayName;
8 import org.junit.jupiter.api.Test;
9 import org.springframework.beans.factory.annotation.Autowired;
10 import org.springframework.boot.test.context.SpringBootTest;
11 import org.springframework.boot.test.context.TestConfiguration;
12 import org.springframework.context.annotation.Bean;
13 import org.springframework.context.annotation.Primary;
14 import org.springframework.kafka.core.KafkaTemplate;
15 import org.springframework.kafka.support.SendResult;
16 import org.springframework.kafka.test.context.EmbeddedKafka;
17
18 import java.time.Duration;
19 import java.util.concurrent.CompletableFuture;
20
21 import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
22 import static org.awaitility.Awaitility.await;
23
24
25 @SpringBootTest(
26                 properties = {
27                                 "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
28                                 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
29                                 "spring.kafka.producer.properties.spring.json.add.type.headers=false",
30                                 "logging.level.root=WARN",
31                                 "logging.level.de.juplo=DEBUG",
32                                 "logging.level.org.apache.kafka.clients=INFO",
33                                 "logging.level.org.apache.kafka.streams=INFO",
34                                 "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
35                                 "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
36                                 "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
37 @EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
38 @Slf4j
39 public class QueryApplicationIT
40 {
41         public static final String TOPIC_TOP10 = "top10";
42         public static final String TOPIC_USERS = "users";
43
44         @Autowired
45         QueryStreamProcessor streamProcessor;
46
47
48         @BeforeAll
49         public static void testSendMessage(
50                         @Autowired KafkaTemplate<String, Object> kafkaTemplate)
51         {
52                 TestData
53                                 .getUsersMessages()
54                                 .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
55                 TestData
56                                 .getTop10Messages()
57                                 .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
58         }
59
60         private static void flush(CompletableFuture<SendResult<String, Object>> future)
61         {
62                 try
63                 {
64                         SendResult<String, Object> result = future.get();
65                         log.info(
66                                         "Sent: {}={}, partition={}, offset={}",
67                                         result.getProducerRecord().key(),
68                                         result.getProducerRecord().value(),
69                                         result.getRecordMetadata().partition(),
70                                         result.getRecordMetadata().offset());
71                 }
72                 catch (Exception e)
73                 {
74                         throw new RuntimeException(e);
75                 }
76         }
77
78         @DisplayName("Await the expected state in the state-store")
79         @Test
80         public void testAwaitExpectedState()
81         {
82                 await("Expected state")
83                                 .atMost(Duration.ofSeconds(5))
84                                 .catchUncaughtExceptions()
85                                 .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
86         }
87
88         @TestConfiguration
89         static class Configuration
90         {
91                 @Primary
92                 @Bean
93                 KeyValueBytesStoreSupplier inMemoryStoreSupplier()
94                 {
95                         return Stores.inMemoryKeyValueStore(STORE_NAME);
96                 }
97         }
98 }