From a5146f975d5383dd2ec046478f20937d821dfa51 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 21:59:55 +0200 Subject: [PATCH] counter: 1.2.15 - Refined `CounterApplicationIT` (simplified setup) --- pom.xml | 2 +- .../counter/CounterApplicationIT.java | 74 +++++-------------- 2 files changed, 18 insertions(+), 58 deletions(-) diff --git a/pom.xml b/pom.xml index 3c5e9eb..3adeb56 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.2.14 + 1.2.15 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 78d103c..992164c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,10 +1,7 @@ package de.juplo.kafka.wordcount.counter; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -16,41 +13,44 @@ import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.*; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import java.time.Duration; import java.util.LinkedList; import java.util.List; -import java.util.Map; -import java.util.Properties; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*; -import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; import static org.awaitility.Awaitility.await; -import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.*; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.use.type.headers=false", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.Word", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.WordCounter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.wordcount.counter.commit-interval=0", "juplo.wordcount.counter.cacheMaxBytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @Slf4j public class CounterApplicationIT { - public final static String TOPIC_IN = "in"; - public final static String TOPIC_OUT = "out"; - static final int PARTITIONS = 2; + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; @Autowired KafkaTemplate kafkaTemplate; @@ -76,7 +76,6 @@ public class CounterApplicationIT } - @RequiredArgsConstructor static class Consumer { private final List> received = new LinkedList<>(); @@ -102,45 +101,6 @@ public class CounterApplicationIT @TestConfiguration static class Configuration { - @Bean - ProducerFactory producerFactory(Properties streamProcessorProperties) - { - Map propertyMap = convertToMap(streamProcessorProperties); - - propertyMap.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - JsonSerializer.class.getName()); - propertyMap.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - JsonSerializer.class.getName()); - - return new DefaultKafkaProducerFactory<>(propertyMap); - } - - @Bean - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - Properties streamProcessorProperties) - { - Map propertyMap = convertToMap(streamProcessorProperties); - - propertyMap.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - JsonDeserializer.class.getName()); - propertyMap.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - JsonDeserializer.class.getName()); - - ConsumerFactory consumerFactory = - new DefaultKafkaConsumerFactory<>(propertyMap); - - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = - new ConcurrentKafkaListenerContainerFactory<>(); - - kafkaListenerContainerFactory.setConsumerFactory(consumerFactory); - - return kafkaListenerContainerFactory; - } - @Bean Consumer consumer() { -- 2.20.1