+++ /dev/null
-package de.juplo.kafka;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.RestController;
-import reactor.core.publisher.Mono;
-
-
-@RestController
-public class ApplicationController
-{
- @Autowired
- DeadLetterConsumer deadLetterConsumer;
-
-
- @GetMapping(path = "/{partition}/{offset}")
- public Mono<String> recordAtOffset(
- @PathVariable int partition,
- @PathVariable long offset)
- {
- return deadLetterConsumer.requestRecord(partition, offset);
- }
-}
--- /dev/null
+package de.juplo.kafka;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+
+@RestController
+public class DeadLetterController
+{
+ @Autowired
+ DeadLetterConsumer deadLetterConsumer;
+
+
+ @GetMapping(path = "/{partition}/{offset}")
+ public Mono<String> recordAtOffset(
+ @PathVariable int partition,
+ @PathVariable long offset)
+ {
+ return deadLetterConsumer.requestRecord(partition, offset);
+ }
+}
package de.juplo.kafka;
+import com.jayway.jsonpath.JsonPath;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.ResponseEntity;
import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.web.servlet.MockMvc;
-import java.time.Duration;
-
-import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.NUM_PARTITIONS;
import static de.juplo.kafka.ApplicationTests.TOPIC;
-import static org.awaitility.Awaitility.await;
-import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath;
-import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
+import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest(
+ webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.consumer.topic=" + TOPIC })
-@AutoConfigureMockMvc
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+ "juplo.consumer.topic=" + TOPIC,
+ "logging.level.de.juplo.kafka=TRACE",
+ })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
public class ApplicationTests
{
- static final String TOPIC = "FOO";
- static final int PARTITIONS = 10;
-
- @Autowired
- MockMvc mockMvc;
-
-
-
@Test
public void testApplicationStartup()
{
- await("Application is healthy")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> mockMvc
- .perform(get("/actuator/health"))
- .andExpect(status().isOk())
- .andExpect(jsonPath("status").value("UP")));
+ ResponseEntity<String> response = restTemplate.getForEntity("/actuator/health", String.class);
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
+ assertThat(JsonPath.parse(response.getBody()).read("$.status", String.class)).isEqualTo("UP");
}
+
+
+ static final String TOPIC = "ExampleConsumerTest_TEST";
+ static final int NUM_PARTITIONS = 7;
+
+ @Autowired
+ TestRestTemplate restTemplate;
}