import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
- @Autowired
- KafkaTemplate<String, TestInputWord> kafkaTemplate;
@Autowired
Consumer consumer;
- @BeforeEach
- public void clear()
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
{
- consumer.received.clear();
+ TestData
+ .getInputMessages()
+ .forEach(kv ->
+ {
+ try
+ {
+ SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
}
-
+ @DisplayName("Await the expected output messages")
@Test
void testSendMessage()
{
- TestData
- .getInputMessages()
- .forEach(kv -> kafkaTemplate.send(TOPIC_IN, kv.key, kv.value));
-
await("Expected messages")
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));