package de.juplo.kafka.wordcount.query;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.junit.jupiter.api.BeforeAll;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
@SpringBootTest(
properties = {
"spring.main.allow-bean-definition-overriding=true",
- "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
- "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<String, Object> kafkaTemplate)
+ @Autowired KafkaTemplate usersKafkaTemplate,
+ @Autowired KafkaTemplate top10KafkaTemplate)
{
TestData
.getUsersMessages()
- .forEach(kv -> flush(kafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+ .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
TestData
.getTop10Messages()
- .forEach(kv -> flush(kafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+ .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
}
- private static void flush(CompletableFuture<SendResult<String, Object>> future)
+ private static void flush(CompletableFuture<SendResult> future)
{
try
{
- SendResult<String, Object> result = future.get();
+ SendResult result = future.get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
@TestConfiguration
static class Configuration
{
+ @Bean
+ KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
+ {
+ Map<String, Object> properties = Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+ JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName());
+ return new KafkaTemplate(producerFactory, properties);
+ }
+
+ @Bean
+ KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
+ {
+ Map<String, Object> properties = Map.of(
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
+ JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+ return new KafkaTemplate(producerFactory, properties);
+ }
+
@Bean
KeyValueBytesStoreSupplier userStoreSupplier()
{