From: Kai Moritz Date: Wed, 12 Jun 2024 21:08:57 +0000 (+0200) Subject: query: 2.0.0 - `QueryApplicationIT` uses two ``KafkaTemplate``s X-Git-Tag: query-with-kafkaproducer~1 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a0d314f7304de50eba56a59d9859e544354f2f7d;p=demos%2Fkafka%2Fwordcount query: 2.0.0 - `QueryApplicationIT` uses two ``KafkaTemplate``s * Switched the setup of the `QueryApplicationIT` to use two different instances of `KafkaTemplate` for the two input-topics. * This is a preparation for the introduction of the typed JSON-keys. --- diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 58a1206..5eb4706 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,7 +1,11 @@ package de.juplo.kafka.wordcount.query; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeAll; @@ -14,13 +18,16 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.Map; import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; @@ -32,9 +39,6 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "spring.main.allow-bean-definition-overriding=true", - "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -63,21 +67,22 @@ public class QueryApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate usersKafkaTemplate, + @Autowired KafkaTemplate top10KafkaTemplate) { TestData .getUsersMessages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); TestData .getTop10Messages() - .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); } - private static void flush(CompletableFuture> future) + private static void flush(CompletableFuture future) { try { - SendResult result = future.get(); + SendResult result = future.get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -132,6 +137,26 @@ public class QueryApplicationIT @TestConfiguration static class Configuration { + @Bean + KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + + @Bean + KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + { + Map properties = Map.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), + JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); + return new KafkaTemplate(producerFactory, properties); + } + @Bean KeyValueBytesStoreSupplier userStoreSupplier() {