package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
import de.juplo.kafka.wordcount.splitter.TestInputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWord;
import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
@SpringBootTest(
properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.properties.spring.json.add.type.headers=false",
"spring.kafka.consumer.auto-offset-reset=earliest",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
"juplo.wordcount.counter.commit-interval=0",
- "juplo.wordcount.counter.cacheMaxBytes=0",
"juplo.wordcount.counter.input-topic=" + TOPIC_IN,
"juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
@Autowired
Consumer consumer;
+ @Autowired
+ CounterStreamProcessor streamProcessor;
@BeforeAll
public static void testSendMessage(
- @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
+ @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
{
TestData
.getInputMessages()
{
try
{
- SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
{
await("Expected number of messages")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
}
@DisplayName("Await the expected output messages")
{
await("Expected messages")
.atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
}
@DisplayName("Await the expected final output messages")
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@Bean
KeyValueBytesStoreSupplier inMemoryStoreSupplier()
{
- return Stores.inMemoryKeyValueStore("TEST-STORE");
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
}
}
}