X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryApplicationIT.java;h=1315eae04e0f67da8ec7294e68346d44d29e8083;hb=700f80444d14b201f7b696fb5b7bcab0d767f007;hp=e38871ff4488bdc69e351af093aabeb86237e0da;hpb=dcb383aef4f4f454e3a1aed9cf1f2a599c7b1bb9;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index e38871f..1315eae 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,6 +1,11 @@ package de.juplo.kafka.wordcount.query; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeAll; @@ -11,33 +16,36 @@ import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMock import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Map; import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; import static org.awaitility.Awaitility.await; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @SpringBootTest( properties = { - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.main.allow-bean-definition-overriding=true", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", "logging.level.org.apache.kafka.streams=INFO", "juplo.wordcount.query.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.query.commit-interval=100", + "juplo.wordcount.query.cache-max-bytes=0", "juplo.wordcount.query.users-input-topic=" + QueryApplicationIT.TOPIC_USERS, "juplo.wordcount.query.ranking-input-topic=" + QueryApplicationIT.TOPIC_TOP10 }) @AutoConfigureMockMvc @@ -52,26 +60,29 @@ public class QueryApplicationIT @Autowired MockMvc mockMvc; @Autowired + ObjectMapper objectMapper; + @Autowired QueryStreamProcessor streamProcessor; @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) { TestData .getUsersMessages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); TestData .getTop10Messages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); } - private static void flush(CompletableFuture> future) + private static void flush(CompletableFuture future) { try { - SendResult result = future.get(); + SendResult result = future.get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -103,17 +114,19 @@ public class QueryApplicationIT .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); } - private String requestUserRankingFor(String user) + private UserRanking requestUserRankingFor(String user) { try { - return mockMvc - .perform(MockMvcRequestBuilders.get("/{user}", user) - .contentType(MediaType.APPLICATION_JSON)) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(StandardCharsets.UTF_8); + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); } catch (Exception e) { @@ -124,11 +137,36 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.ADD_TYPE_INFO_HEADERS, false); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KeyValueBytesStoreSupplier userStoreSupplier() + { + return Stores.inMemoryKeyValueStore(USER_STORE_NAME); + } + + @Bean + KeyValueBytesStoreSupplier rankingStoreSupplier() { - return Stores.inMemoryKeyValueStore(STORE_NAME); + return Stores.inMemoryKeyValueStore(RANKING_STORE_NAME); } } }