]> juplo.de Git - demos/kafka/wordcount/commitdiff
WIP
authorKai Moritz <kai@juplo.de>
Sun, 5 May 2024 11:26:28 +0000 (13:26 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 May 2024 11:26:28 +0000 (13:26 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java
src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java

diff --git a/pom.xml b/pom.xml
index 4e87c6ed22d05da6efcb8e7bc9a59161d14fcedb..126ed077291e629aae81d439d1b9e956d51d8124 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index 11248d8222ba96fec28d01ca43b53ad8a7749051..36a11feeb835e000d39587b0220632ffdb85b26a 100644 (file)
@@ -7,6 +7,7 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.Assert;
 
 import java.util.Properties;
@@ -24,7 +25,7 @@ public class RecorderApplication
                Properties props = new Properties();
                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
                props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
 
                return new KafkaProducer<>(props);
        }
index 885a4088ac3636d0907acdd0c599c45fe0b2b2d4..fe51d2164abc6036c08a84784cb8ad1e4ee8c0bb 100644 (file)
@@ -1,13 +1,76 @@
 package de.juplo.kafka.wordcount.recorder;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MimeType;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.util.MultiValueMap;
 
-@SpringBootTest
+import static de.juplo.kafka.wordcount.recorder.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.recorder.topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_OUT }, partitions = PARTITIONS)
+@Slf4j
 class ApplicationTests
 {
+       public final static String TOPIC_OUT = "out";
+       static final int PARTITIONS = 2;
+
+       @Autowired
+       WebTestClient client;
+
        @Test
        void contextLoads()
        {
        }
+
+       @Test
+       void userEventsAreSent()
+       {
+               client
+                               .post()
+                               .uri("peter")
+                               .contentType(MediaType.TEXT_PLAIN)
+                               .bodyValue("Hallö Wält*&%€!")
+                               .
+       }
+
+       static class Consumer
+       {
+               final MultiValueMap<String, String> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public void receive(ConsumerRecord<String, String> record)
+               {
+                       log.debug("Received message: {}", record);
+                       received.add(record.key(), record.value());
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+       }
 }