From 3c23bfd42005211ac9812fba698ab74c8a6b7aa0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 5 Jun 2024 17:06:30 +0200 Subject: [PATCH] counter: 1.2.15 - Separated serialization-config into a static method --- .../CounterApplicationConfiguriation.java | 25 +++-- .../juplo/kafka/wordcount/counter/Word.java | 4 - .../kafka/wordcount/counter/WordCounter.java | 3 +- .../counter/CounterApplicationIT.java | 22 +++-- .../CounterStreamProcessorTopologyTest.java | 46 ++++----- .../kafka/wordcount/counter/TestData.java | 99 +++++++++---------- .../wordcount/splitter/TestInputWord.java | 15 +++ .../kafka/wordcount/top10/TestOutputWord.java | 15 +++ .../top10/TestOutputWordCounter.java | 16 +++ 9 files changed, 142 insertions(+), 103 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 926045c..34217da 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -25,12 +25,27 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St public class CounterApplicationConfiguriation { @Bean - public Properties streamProcessorProperties(CounterApplicationProperties counterProperties) + public Properties streamProcessorProperties( + CounterApplicationProperties counterProperties) { - Properties propertyMap = new Properties(); + Properties propertyMap = serializationConfig(); propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); + propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); + if (counterProperties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); + if (counterProperties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes()); + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return propertyMap; + } + + static Properties serializationConfig() + { + Properties propertyMap = new Properties(); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); @@ -41,12 +56,6 @@ public class CounterApplicationConfiguriation "word:" + Word.class.getName() + "," + "counter:" + WordCounter.class.getName()); propertyMap.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); - propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); - if (counterProperties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); - if (counterProperties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, counterProperties.getCacheMaxBytes()); - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return propertyMap; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java index 4aa5ee2..77287d5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java @@ -1,13 +1,9 @@ package de.juplo.kafka.wordcount.counter; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor @Data @JsonIgnoreProperties(ignoreUnknown = true) public class Word diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java index 1334e5b..f1fce71 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.counter; +import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -7,7 +8,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor(staticName = "of") +@AllArgsConstructor(access = AccessLevel.PRIVATE) public class WordCounter { String user; diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index ad4faf2..025a160 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,5 +1,8 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -20,7 +23,6 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.stream.Stream; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; @@ -35,8 +37,8 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.properties.spring.json.use.type.headers=false", - "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.Word", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.WordCounter", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.TestOutputWord", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.top10.TestOutputWordCounter", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -52,7 +54,7 @@ public class CounterApplicationIT public static final String TOPIC_OUT = "out"; @Autowired - KafkaTemplate kafkaTemplate; + KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; @@ -67,8 +69,8 @@ public class CounterApplicationIT @Test void testSendMessage() { - Stream - .of(TestData.INPUT_MESSAGES) + TestData + .getInputMessages() .forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word)); await("Expected messages") @@ -79,18 +81,18 @@ public class CounterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) Word word, - @Payload WordCounter counter) + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, + @Payload TestOutputWordCounter counter) { log.debug("Received message: {} -> {}", word, counter); received.add(word, counter); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 8e09d0c..e5964dc 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -1,6 +1,10 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; @@ -10,16 +14,11 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Stream; - -import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; +import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; @@ -33,8 +32,8 @@ public class CounterStreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; + TestInputTopic in; + TestOutputTopic out; @BeforeEach @@ -45,39 +44,32 @@ public class CounterStreamProcessorTopologyTest OUT, Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); - CounterApplicationConfiguriation applicationConfiguriation = - new CounterApplicationConfiguriation(); - Properties streamProcessorProperties = - applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties()); - Map propertyMap = convertToMap(streamProcessorProperties); - - JsonSerde keySerde = new JsonSerde<>(); - keySerde.configure(propertyMap, true); - JsonSerde valueSerde = new JsonSerde<>(); - valueSerde.configure(propertyMap, false); - - testDriver = new TopologyTestDriver(topology, streamProcessorProperties); + testDriver = new TopologyTestDriver(topology, serializationConfig()); in = testDriver.createInputTopic( IN, - (JsonSerializer)keySerde.serializer(), - (JsonSerializer)valueSerde.serializer()); + new StringSerializer(), + new JsonSerializer().noTypeInfo()); out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), - (JsonDeserializer)valueSerde.deserializer()); + new JsonDeserializer() + .copyWithType(TestOutputWord.class) + .ignoreTypeHeaders(), + new JsonDeserializer() + .copyWithType(TestOutputWordCounter.class) + .ignoreTypeHeaders()); } @Test public void test() { - Stream - .of(TestData.INPUT_MESSAGES) + TestData + .getInputMessages() .forEach(word -> in.pipeInput(word.getUser(), word)); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 5dc8bc2..6419059 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -1,14 +1,14 @@ package de.juplo.kafka.wordcount.counter; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KeyValue; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -16,22 +16,27 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final Word[] INPUT_MESSAGES = new Word[] + private static final TestInputWord[] INPUT_MESSAGES = new TestInputWord[] { - Word.of("peter","Hallo"), - Word.of("klaus","Müsch"), - Word.of("peter","Welt"), - Word.of("klaus","Müsch"), - Word.of("klaus","s"), - Word.of("peter","Boäh"), - Word.of("peter","Welt"), - Word.of("peter","Boäh"), - Word.of("klaus","s"), - Word.of("peter","Boäh"), - Word.of("klaus","s"), + TestInputWord.of("peter","Hallo"), + TestInputWord.of("klaus","Müsch"), + TestInputWord.of("peter","Welt"), + TestInputWord.of("klaus","Müsch"), + TestInputWord.of("klaus","s"), + TestInputWord.of("peter","Boäh"), + TestInputWord.of("peter","Welt"), + TestInputWord.of("peter","Boäh"), + TestInputWord.of("klaus","s"), + TestInputWord.of("peter","Boäh"), + TestInputWord.of("klaus","s"), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static Stream getInputMessages() + { + return Stream.of(TestData.INPUT_MESSAGES); + } + + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( (word, counter) -> @@ -39,64 +44,52 @@ class TestData .containsExactlyElementsOf(counter)); } - static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( - Word.of("peter","Hallo"), - WordCounter.of("peter","Hallo",1)), + TestOutputWord.of("peter","Hallo"), + TestOutputWordCounter.of("peter","Hallo",1)), KeyValue.pair( - Word.of("klaus","Müsch"), - WordCounter.of("klaus","Müsch",1)), + TestOutputWord.of("klaus","Müsch"), + TestOutputWordCounter.of("klaus","Müsch",1)), KeyValue.pair( - Word.of("peter","Welt"), - WordCounter.of("peter","Welt",1)), + TestOutputWord.of("peter","Welt"), + TestOutputWordCounter.of("peter","Welt",1)), KeyValue.pair( - Word.of("klaus","Müsch"), - WordCounter.of("klaus","Müsch",2)), + TestOutputWord.of("klaus","Müsch"), + TestOutputWordCounter.of("klaus","Müsch",2)), KeyValue.pair( - Word.of("klaus","s"), - WordCounter.of("klaus","s",1)), + TestOutputWord.of("klaus","s"), + TestOutputWordCounter.of("klaus","s",1)), KeyValue.pair( - Word.of("peter","Boäh"), - WordCounter.of("peter","Boäh",1)), + TestOutputWord.of("peter","Boäh"), + TestOutputWordCounter.of("peter","Boäh",1)), KeyValue.pair( - Word.of("peter","Welt"), - WordCounter.of("peter","Welt",2)), + TestOutputWord.of("peter","Welt"), + TestOutputWordCounter.of("peter","Welt",2)), KeyValue.pair( - Word.of("peter","Boäh"), - WordCounter.of("peter","Boäh",2)), + TestOutputWord.of("peter","Boäh"), + TestOutputWordCounter.of("peter","Boäh",2)), KeyValue.pair( - Word.of("klaus","s"), - WordCounter.of("klaus","s",2)), + TestOutputWord.of("klaus","s"), + TestOutputWordCounter.of("klaus","s",2)), KeyValue.pair( - Word.of("peter","Boäh"), - WordCounter.of("peter","Boäh",3)), + TestOutputWord.of("peter","Boäh"), + TestOutputWordCounter.of("peter","Boäh",3)), KeyValue.pair( - Word.of("klaus","s"), - WordCounter.of("klaus","s",3)), + TestOutputWord.of("klaus","s"), + TestOutputWordCounter.of("klaus","s",3)), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); return expectedMessages; } - static Map convertToMap(Properties properties) - { - return properties - .entrySet() - .stream() - .collect( - Collectors.toMap( - entry -> (String)entry.getKey(), - entry -> entry.getValue() - )); - } - static String parseHeader(Headers headers, String key) { Header header = headers.lastHeader(key); diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java new file mode 100644 index 0000000..71ed1d9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java new file mode 100644 index 0000000..cfc2cae --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java new file mode 100644 index 0000000..1b59387 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWordCounter +{ + String user; + String word; + long counter; +} -- 2.20.1