-package de.juplo.kafka.wordcount.query;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.top10.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.http.MediaType;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.web.servlet.MockMvc;
-import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
-
-import java.nio.charset.StandardCharsets;
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
-import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
-import static org.awaitility.Awaitility.await;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
-
-
-@SpringBootTest(
- properties = {
- "spring.main.allow-bean-definition-overriding=true",
- "logging.level.root=WARN",
- "logging.level.de.juplo=DEBUG",
- "logging.level.org.apache.kafka.clients=INFO",
- "logging.level.org.apache.kafka.streams=INFO",
- "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.query.commit-interval=100",
- "juplo.wordcount.query.cache-max-bytes=0",
- "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS,
- "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 })
-@AutoConfigureMockMvc
-@EmbeddedKafka(topics = { QueryApplicationIT.TOPIC_TOP10, QueryApplicationIT.TOPIC_USERS})
-@Slf4j
-public class QueryApplicationIT
-{
- public static final String TOPIC_TOP10 = "top10";
- public static final String TOPIC_USERS = "users";
-
-
- @Autowired
- MockMvc mockMvc;
- @Autowired
- ObjectMapper objectMapper;
- @Autowired
- QueryStreamProcessor streamProcessor;
-
-
- @BeforeAll
- public static void testSendMessage(
- @Autowired KafkaTemplate usersKafkaTemplate,
- @Autowired KafkaTemplate top10KafkaTemplate)
- {
- TestData
- .getUsersMessages()
- .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
- TestData
- .getTop10Messages()
- .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
- }
-
- private static void flush(CompletableFuture<SendResult> future)
- {
- try
- {
- SendResult result = future.get();
- log.info(
- "Sent: {}={}, partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @DisplayName("Await, that the expected state is visible in the state-store")
- @Test
- public void testAwaitExpectedStateInStore()
- {
- await("The expected state is visible in the state-store")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(user -> streamProcessor.getStore().get(user)));
- }
-
- @DisplayName("Await, that the expected state is queryable")
- @Test
- public void testAwaitExpectedStateIsQueryable()
- {
- await("The expected state is queryable")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
- }
-
- private UserRanking requestUserRankingFor(String user)
- {
- try
- {
- return objectMapper.readValue(
- mockMvc
- .perform(MockMvcRequestBuilders.get("/{user}", user)
- .contentType(MediaType.APPLICATION_JSON))
- .andExpect(status().isOk())
- .andReturn()
- .getResponse()
- .getContentAsString(StandardCharsets.UTF_8),
- UserRanking.class);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
- {
- Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
- return new KafkaTemplate(producerFactory, properties);
- }
-
- @Bean
- KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
- {
- Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
- return new KafkaTemplate(producerFactory, properties);
- }
-
- @Bean
- KeyValueBytesStoreSupplier userStoreSupplier()
- {
- return Stores.inMemoryKeyValueStore(USER_STORE_NAME);
- }
-
- @Bean
- KeyValueBytesStoreSupplier rankingStoreSupplier()
- {
- return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME);
- }
- }
-}