From: Kai Moritz Date: Sun, 2 Jun 2024 16:41:02 +0000 (+0200) Subject: splitter: 1.2.0 - A domain-class (``User``) is used as key X-Git-Tag: splitter-1.2.0~3 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=b95a6dae1b668f87ec14d0ace9b768ca89e338b3;p=demos%2Fkafka%2Fwordcount splitter: 1.2.0 - A domain-class (``User``) is used as key --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java index ead41f8..7143e1a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationConfiguration.java @@ -33,9 +33,10 @@ public class SplitterApplicationConfiguration propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, SplitterApplication.class.getName()); + propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Recording.class.getName()); propertyMap.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 60c569b..d0070c0 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -25,7 +25,7 @@ public class SplitterStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(inputTopic); + KStream source = builder.stream(inputTopic); source .flatMapValues(recording -> Arrays diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/User.java b/src/main/java/de/juplo/kafka/wordcount/splitter/User.java new file mode 100644 index 0000000..8a65695 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.splitter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java new file mode 100644 index 0000000..4406b3b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestOutputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java new file mode 100644 index 0000000..ce413ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/TestInputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java index 891a435..e945b31 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationIT.java @@ -1,7 +1,9 @@ package de.juplo.kafka.wordcount.splitter; +import de.juplo.kafka.wordcount.counter.TestOutputUser; import de.juplo.kafka.wordcount.counter.TestOutputWord; import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -28,12 +30,14 @@ import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", "spring.kafka.producer.properties.spring.json.add.type.headers=false", "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestWord", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.splitter", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.TestOutputUser", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.TestOutputWord", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.splitter.bootstrap-server=${spring.embedded.kafka.brokers}", @@ -51,7 +55,7 @@ public class SplitterApplicationIT @BeforeAll public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) + @Autowired KafkaTemplate kafkaTemplate) { TestData .getInputMessages() @@ -59,7 +63,7 @@ public class SplitterApplicationIT { try { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); log.info( "Sent: {}={}, partition={}, offset={}", result.getProducerRecord().key(), @@ -78,7 +82,7 @@ public class SplitterApplicationIT @Test void testSendMessage() throws Exception { - await("Expexted converted data") + await("Expected converted data") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); @@ -87,18 +91,18 @@ public class SplitterApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String key, + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputUser key, @Payload TestOutputWord value) { log.debug("Received message: {}={}", key, value); received.add(key, value); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java index feedb1e..f89b099 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestData.java @@ -1,7 +1,9 @@ package de.juplo.kafka.wordcount.splitter; +import de.juplo.kafka.wordcount.counter.TestOutputUser; import de.juplo.kafka.wordcount.counter.TestOutputWord; import de.juplo.kafka.wordcount.recorder.TestInputRecording; +import de.juplo.kafka.wordcount.recorder.TestInputUser; import org.apache.kafka.streams.KeyValue; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -15,29 +17,29 @@ import static org.awaitility.Awaitility.await; public class TestData { - static final String PETER = "peter"; - static final String KLAUS = "klaus"; + static final TestInputUser PETER = TestInputUser.of("peter"); + static final TestInputUser KLAUS = TestInputUser.of("klaus"); - static final Stream> getInputMessages() + static final Stream> getInputMessages() { return Stream.of(INPUT_MESSAGES); } - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( PETER, - TestInputRecording.of(PETER, "Hallo Welt!")), + TestInputRecording.of(PETER.getUser(), "Hallo Welt!")), new KeyValue<>( KLAUS, - TestInputRecording.of(KLAUS, "Müsch gäb's auch!")), + TestInputRecording.of(KLAUS.getUser(), "Müsch gäb's auch!")), new KeyValue<>( PETER, - TestInputRecording.of(PETER, "Boäh, echt! ß mal nä Nümmäh!")), + TestInputRecording.of(PETER.getUser(), "Boäh, echt! ß mal nä Nümmäh!")), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { await("Received expected messages") .atMost(Duration.ofSeconds(5)) @@ -45,49 +47,49 @@ public class TestData assertThat(receivedMessages.get(user)).containsExactlyElementsOf(word))); } - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Hallo")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Hallo")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Welt")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Welt")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "Müsch")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "Müsch")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "gäb")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "gäb")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "s")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "s")), KeyValue.pair( - "klaus", - TestOutputWord.of("klaus", "auch")), + TestOutputUser.of(KLAUS.getUser()), + TestOutputWord.of(KLAUS.getUser(), "auch")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Boäh")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Boäh")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "echt")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "echt")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "ß")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "ß")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "mal")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "mal")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "nä")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "nä")), KeyValue.pair( - "peter", - TestOutputWord.of("peter", "Nümmäh")), + TestOutputUser.of(PETER.getUser()), + TestOutputWord.of(PETER.getUser(), "Nümmäh")), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));