query: 2.0.0 - `QueryApplicationIT` uses two ``KafkaTemplate``s
authorKai Moritz <kai@juplo.de>
Wed, 12 Jun 2024 21:08:57 +0000 (23:08 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 06:22:59 +0000 (08:22 +0200)
* Switched the setup of the `QueryApplicationIT` to use two different
  instances  of `KafkaTemplate` for the two input-topics.
* This is a preparation for the introduction of the typed JSON-keys.

src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java

index 58a1206..5eb4706 100644 (file)
@@ -1,7 +1,11 @@
 package de.juplo.kafka.wordcount.query;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.BeforeAll;
@@ -14,13 +18,16 @@ import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.http.MediaType;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
 import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
 
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
@@ -32,9 +39,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
                properties = {
                                "spring.main.allow-bean-definition-overriding=true",
-                               "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
-                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
@@ -63,21 +67,22 @@ public class QueryApplicationIT
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate<String, Object> kafkaTemplate)
+                       @Autowired KafkaTemplate usersKafkaTemplate,
+                       @Autowired KafkaTemplate top10KafkaTemplate)
        {
                TestData
                                .getUsersMessages()
-                               .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+                               .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
                TestData
                                .getTop10Messages()
-                               .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+                               .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
        }
 
-       private static void flush(CompletableFuture<SendResult<String, Object>> future)
+       private static void flush(CompletableFuture<SendResult> future)
        {
                try
                {
-                       SendResult<String, Object> result = future.get();
+                       SendResult result = future.get();
                        log.info(
                                        "Sent: {}={}, partition={}, offset={}",
                                        result.getProducerRecord().key(),
@@ -132,6 +137,26 @@ public class QueryApplicationIT
        @TestConfiguration
        static class Configuration
        {
+               @Bean
+               KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
+               {
+                       Map<String, Object> properties = Map.of(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+                                       JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName());
+                       return new KafkaTemplate(producerFactory, properties);
+               }
+
+               @Bean
+               KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
+               {
+                       Map<String, Object> properties = Map.of(
+                                       ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+                                       ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+                                       JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+                       return new KafkaTemplate(producerFactory, properties);
+               }
+
                @Bean
                KeyValueBytesStoreSupplier userStoreSupplier()
                {