WIP:KafkaProducer query-with-kafkaproducer
authorKai Moritz <kai@juplo.de>
Fri, 14 Jun 2024 17:24:21 +0000 (19:24 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 14 Jun 2024 17:26:06 +0000 (19:26 +0200)
src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java

index 5eb4706..dc517e9 100644 (file)
@@ -4,7 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.wordcount.top10.TestRanking;
 import de.juplo.kafka.wordcount.users.TestUserData;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -17,9 +21,6 @@ import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
 import org.springframework.http.MediaType;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.web.servlet.MockMvc;
@@ -28,7 +29,8 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Function;
 
 import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
 import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
@@ -67,28 +69,33 @@ public class QueryApplicationIT
 
        @BeforeAll
        public static void testSendMessage(
-                       @Autowired KafkaTemplate usersKafkaTemplate,
-                       @Autowired KafkaTemplate top10KafkaTemplate)
+                       @Autowired KafkaProducer usersProducer,
+                       @Autowired KafkaProducer top10Producer)
        {
                TestData
                                .getUsersMessages()
-                               .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+                               .forEach(kv -> sendAndFlush(asRecord(TOPIC_USERS, kv.key, kv.value), record -> usersProducer.send(record)));
                TestData
                                .getTop10Messages()
-                               .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+                               .forEach(kv -> sendAndFlush(asRecord(TOPIC_TOP10, kv.key, kv.value), record -> top10Producer.send(record)));
        }
 
-       private static void flush(CompletableFuture<SendResult> future)
+       private static ProducerRecord asRecord(String topic, Object key, Object value)
+       {
+               return new ProducerRecord(topic, key, value);
+       }
+
+       private static void sendAndFlush(ProducerRecord record, Function<ProducerRecord, Future<RecordMetadata>> send)
        {
                try
                {
-                       SendResult result = future.get();
+                       RecordMetadata metadata = send.apply(record).get();
                        log.info(
                                        "Sent: {}={}, partition={}, offset={}",
-                                       result.getProducerRecord().key(),
-                                       result.getProducerRecord().value(),
-                                       result.getRecordMetadata().partition(),
-                                       result.getRecordMetadata().offset());
+                                       record.key(),
+                                       record.value(),
+                                       metadata.partition(),
+                                       metadata.offset());
                }
                catch (Exception e)
                {
@@ -138,23 +145,25 @@ public class QueryApplicationIT
        static class Configuration
        {
                @Bean
-               KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
+               KafkaProducer usersProducer(QueryApplicationProperties applicationProperties)
                {
                        Map<String, Object> properties = Map.of(
+                                       CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer(),
                                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
                                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
                                        JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName());
-                       return new KafkaTemplate(producerFactory, properties);
+                       return new KafkaProducer(properties);
                }
 
                @Bean
-               KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
+               KafkaProducer top10Producer(QueryApplicationProperties applicationProperties)
                {
                        Map<String, Object> properties = Map.of(
+                                       CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer(),
                                        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
                                        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
                                        JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
-                       return new KafkaTemplate(producerFactory, properties);
+                       return new KafkaProducer(properties);
                }
 
                @Bean