WIP
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / recorder / ApplicationTests.java
index 885a408..42729d1 100644 (file)
@@ -1,13 +1,75 @@
 package de.juplo.kafka.wordcount.recorder;
 
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
 
-@SpringBootTest
+import static de.juplo.kafka.wordcount.recorder.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringBootTest(
+               properties = {
+                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}",
+                               "juplo.wordcount.recorder.topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_OUT }, partitions = PARTITIONS)
+@Slf4j
 class ApplicationTests
 {
+       public final static String TOPIC_OUT = "out";
+       static final int PARTITIONS = 2;
+
+       @Autowired
+       private TestRestTemplate restTemplate;
+
        @Test
        void contextLoads()
        {
        }
+
+       @Test
+       void userEventsAreSent()
+       {
+               assertThat(restTemplate.postForObject("peter", "Hällö Wählt?*$@¢!", RecordingResult.class)).contains("Hello, World");
+               client
+                               .post()
+                               .uri("peter")
+                               .contentType(MediaType.TEXT_PLAIN)
+                               .bodyValue("Hallö Wält*&%€!")
+                               .
+       }
+
+       static class Consumer
+       {
+               final MultiValueMap<String, String> received = new LinkedMultiValueMap<>();
+
+               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+               public void receive(ConsumerRecord<String, String> record)
+               {
+                       log.debug("Received message: {}", record);
+                       received.add(record.key(), record.value());
+               }
+       }
+
+       @TestConfiguration
+       static class Configuration
+       {
+               @Bean
+               Consumer consumer()
+               {
+                       return new Consumer();
+               }
+       }
 }