From 87b660dca9fa91ba5d0611dc8b870377ff757179 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 13:26:28 +0200 Subject: [PATCH] WIP --- pom.xml | 5 ++ .../recorder/RecorderApplication.java | 3 +- .../wordcount/recorder/ApplicationTests.java | 65 ++++++++++++++++++- 3 files changed, 71 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 4e87c6e..126ed07 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,11 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java index 11248d8..36a11fe 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java @@ -7,6 +7,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.Assert; import java.util.Properties; @@ -24,7 +25,7 @@ public class RecorderApplication Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new KafkaProducer<>(props); } diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java index 885a408..fe51d21 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java @@ -1,13 +1,76 @@ package de.juplo.kafka.wordcount.recorder; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; +import org.springframework.util.MultiValueMap; -@SpringBootTest +import static de.juplo.kafka.wordcount.recorder.ApplicationTests.PARTITIONS; +import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT; + + +@SpringBootTest( + properties = { + "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) +@EmbeddedKafka(topics = { TOPIC_OUT }, partitions = PARTITIONS) +@Slf4j class ApplicationTests { + public final static String TOPIC_OUT = "out"; + static final int PARTITIONS = 2; + + @Autowired + WebTestClient client; + @Test void contextLoads() { } + + @Test + void userEventsAreSent() + { + client + .post() + .uri("peter") + .contentType(MediaType.TEXT_PLAIN) + .bodyValue("Hallö Wält*&%€!") + . + } + + static class Consumer + { + final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public void receive(ConsumerRecord record) + { + log.debug("Received message: {}", record); + received.add(record.key(), record.value()); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } } -- 2.20.1