WIP:KafkaChatHomeTest tmp
authorKai Moritz <kai@juplo.de>
Tue, 29 Aug 2023 16:44:46 +0000 (18:44 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 29 Aug 2023 16:44:46 +0000 (18:44 +0200)
git commit -a -mWIP:KafkaChatHomeTest

src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeTest.java

index 1298122..cd3b858 100644 (file)
@@ -5,22 +5,33 @@ import de.juplo.kafka.chat.backend.domain.ChatHomeWithShardsTestBase;
 import de.juplo.kafka.chat.backend.persistence.kafka.messages.AbstractMessageTo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
 import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
 import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.kafka.test.utils.KafkaTestUtils;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.time.Clock;
 import java.time.ZoneId;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
 import static de.juplo.kafka.chat.backend.persistence.kafka.KafkaChatHomeTest.TOPIC;
@@ -61,6 +72,29 @@ public class KafkaChatHomeTest extends ChatHomeWithShardsTestBase
           clock);
     }
 
+    @Bean
+    KafkaTemplate<String, String> kafkaTemplate(EmbeddedKafkaBroker embeddedKafka)
+    {
+      Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
+      ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
+      return new KafkaTemplate<>(producerFactory);
+    }
+
+    @Bean
+    Producer<String, AbstractMessageTo>  chatRoomChannelProducer(
+        StringSerializer stringSerializer,
+        JsonSerializer<AbstractMessageTo> messageSerializer)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(
+          ProducerConfig.CLIENT_ID_CONFIG,
+          "KAFKACHATHOMETEST_CHATROOM_CHANNEL_PRODUCER");
+      return new KafkaProducer<>(
+          properties,
+          stringSerializer,
+          messageSerializer);
+    }
+
     Integer numShards()
     {
       return 10;