X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationIT.java;h=1bfceed7a9466d84f67035e79f7dfeab2459de49;hb=e6198710fe679bc7463d1b17c9d9dc311062ef31;hp=a53ffc801079476365dff0d16d5d3d7248539c70;hpb=3eb1ec997478982fffb31f09c21e756b529e474a;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index a53ffc8..1bfceed 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; @@ -34,6 +35,7 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", @@ -62,7 +64,7 @@ public class CounterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -70,7 +72,7 @@ public class CounterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -91,7 +93,8 @@ public class CounterApplicationIT { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); } @DisplayName("Await the expected output messages") @@ -100,7 +103,8 @@ public class CounterApplicationIT { await("Expected messages") .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); } @DisplayName("Await the expected final output messages") @@ -109,7 +113,8 @@ public class CounterApplicationIT { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); } @DisplayName("Await the expected state in the state-store")