From 8915bf9e60d2b08c36a2da4af4fd57a9dcf4bf55 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 29 Aug 2023 18:44:46 +0200 Subject: [PATCH] WIP:KafkaChatHomeTest git commit -a -mWIP:KafkaChatHomeTest --- .../persistence/kafka/KafkaChatHomeTest.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java index 1298122a..cd3b858c 100644 --- a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java @@ -5,22 +5,33 @@ import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase; import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringSerializer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.time.Clock; import java.time.ZoneId; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.CompletableFuture; import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC; @@ -61,6 +72,29 @@ public class KafkaChatHomeTest extends ChatHomeWithShardsTestBase clock); } + @Bean + KafkaTemplate kafkaTemplate(EmbeddedKafkaBroker embeddedKafka) + { + Map producerProps = KafkaTestUtils.producerProps(embeddedKafka); + ProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerProps); + return new KafkaTemplate<>(producerFactory); + } + + @Bean + Producer chatRoomChannelProducer( + StringSerializer stringSerializer, + JsonSerializer messageSerializer) + { + Map properties = new HashMap<>(); + properties.put( + ProducerConfig.CLIENT_ID_CONFIG, + "KAFKACHATHOMETEST_CHATROOM_CHANNEL_PRODUCER"); + return new KafkaProducer<>( + properties, + stringSerializer, + messageSerializer); + } + Integer numShards() { return 10; -- 2.20.1