From 1e88833dcb5b2bf1338267d340ce7574f07ae1d3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 14 Jun 2024 19:24:21 +0200 Subject: [PATCH] WIP:KafkaProducer --- .../wordcount/query/QueryApplicationIT.java | 45 +++++++++++-------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 5eb4706..dc517e9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -4,7 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -17,9 +21,6 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.http.MediaType; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.core.ProducerFactory; -import org.springframework.kafka.support.SendResult; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; @@ -28,7 +29,8 @@ import org.springframework.test.web.servlet.request.MockMvcRequestBuilders; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; +import java.util.function.Function; import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME; import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME; @@ -67,28 +69,33 @@ public class QueryApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate usersKafkaTemplate, - @Autowired KafkaTemplate top10KafkaTemplate) + @Autowired KafkaProducer usersProducer, + @Autowired KafkaProducer top10Producer) { TestData .getUsersMessages() - .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value))); + .forEach(kv -> sendAndFlush(asRecord(TOPIC_USERS, kv.key, kv.value), record -> usersProducer.send(record))); TestData .getTop10Messages() - .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value))); + .forEach(kv -> sendAndFlush(asRecord(TOPIC_TOP10, kv.key, kv.value), record -> top10Producer.send(record))); } - private static void flush(CompletableFuture future) + private static ProducerRecord asRecord(String topic, Object key, Object value) + { + return new ProducerRecord(topic, key, value); + } + + private static void sendAndFlush(ProducerRecord record, Function> send) { try { - SendResult result = future.get(); + RecordMetadata metadata = send.apply(record).get(); log.info( "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); + record.key(), + record.value(), + metadata.partition(), + metadata.offset()); } catch (Exception e) { @@ -138,23 +145,25 @@ public class QueryApplicationIT static class Configuration { @Bean - KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory) + KafkaProducer usersProducer(QueryApplicationProperties applicationProperties) { Map properties = Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer(), ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName()); - return new KafkaTemplate(producerFactory, properties); + return new KafkaProducer(properties); } @Bean - KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory) + KafkaProducer top10Producer(QueryApplicationProperties applicationProperties) { Map properties = Map.of( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer(), ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName()); - return new KafkaTemplate(producerFactory, properties); + return new KafkaProducer(properties); } @Bean -- 2.20.1