From: Kai Moritz Date: Mon, 3 Jun 2024 07:53:25 +0000 (+0200) Subject: top10: 1.2.1 - Refined `Top10StreamProcessorTopologyTest` X-Git-Tag: top10-1.2.1~10 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d1ac99074b22f357152644d4593b234540d1c952;p=demos%2Fkafka%2Fwordcount top10: 1.2.1 - Refined `Top10StreamProcessorTopologyTest` --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 6e1f93f..bb6fef7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -12,8 +12,9 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -30,8 +31,24 @@ public class Top10ApplicationConfiguration { Properties props = new Properties(); + props.putAll(serializationConfig()); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + if (properties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes()); + + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return props; + } + + static Map serializationConfig() + { + Map props = new HashMap<>(); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); @@ -43,12 +60,6 @@ public class Top10ApplicationConfiguration "counter:" + Entry.class.getName() + "," + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - if (properties.getCommitInterval() != null) - props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); - if (properties.getCacheMaxBytes() != null) - props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index a4f79ac..f4e557c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -9,9 +9,6 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -178,18 +175,6 @@ class TestData return expectedMessages; } - static Map convertToMap(Properties properties) - { - return properties - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> (String)entry.getKey(), - entry -> entry.getValue() - )); - } - static String parseHeader(Headers headers, String key) { Header header = headers.lastHeader(key); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 0c9759c..f2a9eca 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -20,8 +20,8 @@ import java.util.Map; import java.util.Properties; import java.util.stream.Stream; -import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; +import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @@ -47,18 +47,17 @@ public class Top10StreamProcessorTopologyTest OUT, Stores.inMemoryKeyValueStore(STORE_NAME)); - Top10ApplicationConfiguration applicationConfiguriation = - new Top10ApplicationConfiguration(); - Properties streamProcessorProperties = - applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); - Map propertyMap = convertToMap(streamProcessorProperties); + Map propertyMap = serializationConfig(); + + Properties properties = new Properties(); + properties.putAll(propertyMap); JsonSerde keySerde = new JsonSerde<>(); keySerde.configure(propertyMap, true); JsonSerde valueSerde = new JsonSerde<>(); valueSerde.configure(propertyMap, false); - testDriver = new TopologyTestDriver(topology, streamProcessorProperties); + testDriver = new TopologyTestDriver(topology, properties); in = testDriver.createInputTopic( IN,