From: Kai Moritz Date: Sun, 16 Jun 2024 16:26:47 +0000 (+0200) Subject: counter: 1.3.1 - Cleand code/setup for tests X-Git-Tag: counter-1.3.1~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d8173052504e89f85b09ce060302e87979973714;p=demos%2Fkafka%2Fwordcount counter: 1.3.1 - Cleand code/setup for tests --- diff --git a/pom.xml b/pom.xml index 5859736..03a7b40 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.3.0 + 1.3.1 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 334cd05..0faa2de 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -14,7 +14,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; @@ -35,6 +34,7 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.main.allow-bean-definition-overriding=true", "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", @@ -45,7 +45,8 @@ import static org.awaitility.Awaitility.await; "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=0", + "juplo.wordcount.counter.commit-interval=100", + "juplo.wordcount.counter.cache-max-bytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @@ -155,9 +156,8 @@ public class CounterApplicationIT return new Consumer(); } - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier storeSupplier() { return Stores.inMemoryKeyValueStore(STORE_NAME); } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 0ffd516..9c86c6c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -20,6 +20,7 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; @Slf4j @@ -27,7 +28,6 @@ public class CounterStreamProcessorTopologyTest { public static final String IN = "TEST-IN"; public static final String OUT = "TEST-OUT"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -52,12 +52,8 @@ public class CounterStreamProcessorTopologyTest out = testDriver.createOutputTopic( OUT, - new JsonDeserializer() - .copyWithType(TestOutputWord.class) - .ignoreTypeHeaders(), - new JsonDeserializer() - .copyWithType(TestOutputWordCounter.class) - .ignoreTypeHeaders()); + new JsonDeserializer(TestOutputWord.class).ignoreTypeHeaders(), + new JsonDeserializer(TestOutputWordCounter.class).ignoreTypeHeaders()); } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 1ecfdbd..862eb2b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -33,37 +33,37 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_HALLO)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_MÜSCH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_WELT)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(PETER), TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( + KeyValue.pair( TestInputUser.of(KLAUS), TestInputWord.of(KLAUS, WORD_S)), };