query: 2.0.0 - Defined 2 state-stores (all state in-memory in tests)
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / query / QueryApplicationIT.java
index 4e44cda..d800fbd 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.query;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -7,26 +8,33 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
+import org.springframework.http.MediaType;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
 
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
+import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
 import static org.awaitility.Awaitility.await;
+import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
 
 
 @SpringBootTest(
                properties = {
+                               "spring.main.allow-bean-definition-overriding=true",
                                "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
                                "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+                               "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
@@ -34,6 +42,7 @@ import static org.awaitility.Awaitility.await;
                                "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
                                "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
                                "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
+@AutoConfigureMockMvc
 @EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
 @Slf4j
 public class QueryApplicationIT
@@ -41,6 +50,11 @@ public class QueryApplicationIT
        public static final String TOPIC_TOP10 = "top10";
        public static final String TOPIC_USERS = "users";
 
+
+       @Autowired
+       MockMvc mockMvc;
+       @Autowired
+       ObjectMapper objectMapper;
        @Autowired
        QueryStreamProcessor streamProcessor;
 
@@ -75,24 +89,57 @@ public class QueryApplicationIT
                }
        }
 
-       @DisplayName("Await the expected state in the state-store")
+       @DisplayName("Await, that the expected state is visible in the state-store")
        @Test
-       public void testAwaitExpectedState()
+       public void testAwaitExpectedStateInStore()
        {
-               await("Expected state")
+               await("The expected state is visible in the state-store")
                                .atMost(Duration.ofSeconds(5))
-                               .catchUncaughtExceptions()
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+                               .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
+       }
+
+       @DisplayName("Await, that the expected state is queryable")
+       @Test
+       public void testAwaitExpectedStateIsQueryable()
+       {
+               await("The expected state is queryable")
+                               .atMost(Duration.ofSeconds(5))
+                               .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
+       }
+
+       private UserRanking requestUserRankingFor(String user)
+       {
+               try
+               {
+                       return objectMapper.readValue(
+                                       mockMvc
+                                                       .perform(MockMvcRequestBuilders.get("/{user}", user)
+                                                                       .contentType(MediaType.APPLICATION_JSON))
+                                                       .andExpect(status().isOk())
+                                                       .andReturn()
+                                                       .getResponse()
+                                                       .getContentAsString(StandardCharsets.UTF_8),
+                                       UserRanking.class);
+               }
+               catch (Exception e)
+               {
+                       throw new RuntimeException(e);
+               }
        }
 
        @TestConfiguration
        static class Configuration
        {
-               @Primary
                @Bean
-               KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+               KeyValueBytesStoreSupplier userStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
+               }
+
+               @Bean
+               KeyValueBytesStoreSupplier rankingStoreSupplier()
                {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+                       return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
                }
        }
 }