X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationIT.java;h=334cd05d4e788cbbd90569e016873f6a8f3eb8dd;hb=19226e74f738681af8f4d6a574cf9e69eae1da90;hp=9995ce7f938fc4abedefc78b153e451afdc0cd35;hpb=8d0426539d69616900f4d6ef19e52d50b497f57f;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 9995ce7..334cd05 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputUser; import de.juplo.kafka.wordcount.splitter.TestInputWord; import de.juplo.kafka.wordcount.top10.TestOutputWord; import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; @@ -34,6 +35,7 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", @@ -44,7 +46,6 @@ import static org.awaitility.Awaitility.await; "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.wordcount.counter.commit-interval=0", - "juplo.wordcount.counter.cacheMaxBytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @@ -62,7 +63,7 @@ public class CounterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -70,7 +71,7 @@ public class CounterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(),