import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.recorder.TestRecording;
import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.util.MultiValueMap;
import java.time.Duration;
+import java.util.stream.Stream;
-import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.*;
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.splitter.SplitterApplicationIT.TOPIC_OUT;
import static org.awaitility.Awaitility.await;
public final static String TOPIC_IN = "in";
public final static String TOPIC_OUT = "out";
- @Autowired
- KafkaTemplate<String, TestRecording> kafkaTemplate;
@Autowired
Consumer consumer;
-
- @BeforeEach
- public void clear()
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<String, TestRecording> kafkaTemplate)
{
- consumer.received.clear();
+ Stream
+ .of(TestData.INPUT_MESSAGES)
+ .forEach(kv ->
+ {
+ try
+ {
+ SendResult<String, TestRecording> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
}
+
@Test
void testSendMessage() throws Exception
{
- TestData.writeInputData((user, recording) -> kafkaTemplate.send(TOPIC_IN, user, recording));
-
await("Expexted converted data")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
import org.springframework.util.MultiValueMap;
import java.time.Duration;
-import java.util.function.BiConsumer;
import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
static final String PETER = "peter";
static final String KLAUS = "klaus";
- static void writeInputData(BiConsumer<String, TestRecording> consumer)
- {
- Stream
- .of(INPUT_MESSAGES)
- .forEach(kv -> consumer.accept(kv.key, kv.value));
- }
static final KeyValue<String, TestRecording>[] INPUT_MESSAGES = new KeyValue[]
{