package de.juplo.kafka.wordcount.recorder;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.jupiter.api.DisplayName;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
+import java.time.Duration;
+
import static de.juplo.kafka.wordcount.recorder.ApplicationTests.PARTITIONS;
import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch;
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post;
import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print;
@Autowired
- private MockMvc mockMvc;
+ MockMvc mockMvc;
+ @Autowired
+ Consumer consumer;
+ @Autowired
+ ObjectMapper objectMapper;
+
@Test
@DisplayName("The application context loads")
}
@Test
+ @DisplayName("Posted messages are excepted and sent to Kafka")
void userMessagesAreExceptedAndSentToKafka() throws Exception
{
MvcResult result = mockMvc
.andExpect(status().isOk())
.andExpect(jsonPath("$.username").value(USER))
.andExpect(jsonPath("$.sentence").value(SENTENCE));
+
+ await("Expexted converted data")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() ->
+ {
+ assertThat(consumer.received.get(USER)).hasSize(1);
+ RecordingResult recordingResult = objectMapper.readValue(
+ consumer.received.get(USER).get(0),
+ RecordingResult.class);
+ assertThat(recordingResult.getUsername()).isEqualTo(USER);
+ assertThat(recordingResult.getSentence()).isEqualTo(SENTENCE);
+ assertThat(recordingResult.getTopic()).isEqualTo(TOPIC_OUT);
+ assertThat(recordingResult.getPartition()).isBetween(0, PARTITIONS - 1);
+ assertThat(recordingResult.getStatus()).isNull();
+ assertThat(recordingResult.getError()).isNull();
+ });
}
static class Consumer