import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Function;
import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.RANKING_STORE_NAME;
import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.USER_STORE_NAME;
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate usersKafkaTemplate,
- @Autowired KafkaTemplate top10KafkaTemplate)
+ @Autowired KafkaProducer usersProducer,
+ @Autowired KafkaProducer top10Producer)
{
TestData
.getUsersMessages()
- .forEach(kv -> flush(usersKafkaTemplate.send(TOPIC_USERS, kv.key, kv.value)));
+ .forEach(kv -> sendAndFlush(asRecord(TOPIC_USERS, kv.key, kv.value), record -> usersProducer.send(record)));
TestData
.getTop10Messages()
- .forEach(kv -> flush(top10KafkaTemplate.send(TOPIC_TOP10, kv.key, kv.value)));
+ .forEach(kv -> sendAndFlush(asRecord(TOPIC_TOP10, kv.key, kv.value), record -> top10Producer.send(record)));
}
- private static void flush(CompletableFuture<SendResult> future)
+ private static ProducerRecord asRecord(String topic, Object key, Object value)
+ {
+ return new ProducerRecord(topic, key, value);
+ }
+
+ private static void sendAndFlush(ProducerRecord record, Function<ProducerRecord, Future<RecordMetadata>> send)
{
try
{
- SendResult result = future.get();
+ RecordMetadata metadata = send.apply(record).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset());
+ record.key(),
+ record.value(),
+ metadata.partition(),
+ metadata.offset());
}
catch (Exception e)
{
static class Configuration
{
@Bean
- KafkaTemplate usersKafkaTemplate(ProducerFactory producerFactory)
+ KafkaProducer usersProducer(QueryApplicationProperties applicationProperties)
{
Map<String, Object> properties = Map.of(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName());
- return new KafkaTemplate(producerFactory, properties);
+ return new KafkaProducer(properties);
}
@Bean
- KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
+ KafkaProducer top10Producer(QueryApplicationProperties applicationProperties)
{
Map<String, Object> properties = Map.of(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
- return new KafkaTemplate(producerFactory, properties);
+ return new KafkaProducer(properties);
}
@Bean