WIP
authorKai Moritz <kai@juplo.de>
Sun, 5 May 2024 11:26:28 +0000 (13:26 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 May 2024 11:26:28 +0000 (13:26 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java
src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index 4e87c6e..126ed07 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index 11248d8..36a11fe 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.Assert;
 
 import java.util.Properties;
@@ -24,7 +25,7 @@ public class RecorderApplication
                Properties props = new Properties();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
 
                return new KafkaProducer<>(props);
        }
index 885a408..fe51d21 100644 (file)
@@ -1,13 +1,76 @@
 package de.juplo.kafka.wordcount.recorder;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MimeType;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.util.MultiValueMap;
 
-@SpringBootTest
+import static de.juplo.kafka.wordcount.recorder.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.recorder.topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_OUT }, partitions = PARTITIONS)
+@Slf4j
 class ApplicationTests
 {
+       public final static String TOPIC_OUT = "out";
+       static final int PARTITIONS = 2;
+
+       @Autowired
+       WebTestClient client;
+
        @Test
        void contextLoads()
        {
        }
+
+       @Test
+       void userEventsAreSent()
+       {
+               client
+                               .post()
+                               .uri("peter")
+                               .contentType(MediaType.TEXT_PLAIN)
+                               .bodyValue("Hallö Wält*&%€!")
+                               .
+       }
+
+       static class Consumer
+       {
+               final MultiValueMap<String, String> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public void receive(ConsumerRecord<String, String> record)
+               {
+                       log.debug("Received message: {}", record);
+                       received.add(record.key(), record.value());
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+       }
 }