From: Kai Moritz Date: Sun, 16 Jun 2024 19:17:27 +0000 (+0200) Subject: counter: 1.3.1 - Refined `CounterStreamProcessor` (DRY for type-mapping) X-Git-Tag: counter-1.3.1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fcounter;hp=cb4a5de56f142603eb63e82f6769616bf5ff058a;p=demos%2Fkafka%2Fwordcount counter: 1.3.1 - Refined `CounterStreamProcessor` (DRY for type-mapping) --- diff --git a/Dockerfile b/Dockerfile index d2218b8..2e032f3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:17-jre COPY target/*.jar /opt/app.jar EXPOSE 8083 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index cbccc73..03a7b40 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ org.springframework.boot spring-boot-starter-parent - 3.0.2 + 3.2.5 de.juplo.kafka.wordcount counter - 1.2.3 + 1.3.1 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java index 1bcc834..174521f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; @@ -13,11 +12,11 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @@ -27,45 +26,47 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St public class CounterApplicationConfiguriation { @Bean - public Properties propertyMap(CounterApplicationProperties properties) + public Properties streamProcessorProperties( + CounterApplicationProperties counterProperties) + { + Properties propertyMap = serializationConfig(); + + propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); + + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); + if (counterProperties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); + if (counterProperties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); + + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return propertyMap; + } + + static Properties serializationConfig() { Properties propertyMap = new Properties(); - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, Word.class.getPackageName()); - propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, Word.class.getName()); - propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); - propertyMap.put( - JsonDeserializer.TYPE_MAPPINGS, - "W:" + Word.class.getName() + "," + - "C:" + WordCount.class.getName()); - propertyMap.put(StreamsConfig.STATE_DIR_CONFIG, "target"); - if (properties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); - if (properties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); return propertyMap; } @Bean(initMethod = "start", destroyMethod = "stop") public CounterStreamProcessor streamProcessor( - CounterApplicationProperties properties, - Properties propertyMap, + CounterApplicationProperties applicationProperties, + Properties streamProcessorProperties, KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper objectMapper, ConfigurableApplicationContext context) { CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - properties.getInputTopic(), - properties.getOutputTopic(), - propertyMap, - storeSupplier, - objectMapper); + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + streamProcessorProperties, + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -85,6 +86,6 @@ public class CounterApplicationConfiguriation @Bean public KeyValueBytesStoreSupplier storeSupplier() { - return Stores.persistentKeyValueStore("counter"); + return Stores.persistentKeyValueStore(STORE_NAME); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index d64eb68..2304e55 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -1,22 +1,27 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.Map; import java.util.Properties; +import java.util.stream.Collectors; @Slf4j public class CounterStreamProcessor { + public static final String STORE_NAME = "counter"; + + public final KafkaStreams streams; @@ -24,14 +29,12 @@ public class CounterStreamProcessor String inputTopic, String outputTopic, Properties properties, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper) + KeyValueBytesStoreSupplier storeSupplier) { Topology topology = CounterStreamProcessor.buildTopology( inputTopic, outputTopic, - storeSupplier, - mapper); + storeSupplier); streams = new KafkaStreams(topology, properties); } @@ -39,22 +42,21 @@ public class CounterStreamProcessor static Topology buildTopology( String inputTopic, String outputTopic, - KeyValueBytesStoreSupplier storeSupplier, - ObjectMapper mapper) + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream( - inputTopic, - Consumed.with(Serdes.String(), null)); - - source + builder + .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) .map((key, word) -> new KeyValue<>(word, word)) .groupByKey() - .count(Materialized.as(storeSupplier)) + .count( + Materialized + .as(storeSupplier) + .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed! .toStream() - .map((word, count) -> new KeyValue<>(word, WordCount.of(word, count))) - .to(outputTopic); + .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) + .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); Topology topology = builder.build(); log.info("\n\n{}", topology.describe()); @@ -62,6 +64,11 @@ public class CounterStreamProcessor return topology; } + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + } + public void start() { log.info("Starting Stream-Processor"); @@ -73,4 +80,51 @@ public class CounterStreamProcessor log.info("Stopping Stream-Processor"); streams.close(); } + + + + public static JsonSerde inKeySerde() + { + return new JsonSerde<>(User.class); + } + + public static JsonSerde inValueSerde() + { + return new JsonSerde<>(Word.class); + } + + public static JsonSerde outKeySerde() + { + return serde(true); + } + + public static JsonSerde outValueSerde() + { + return serde(false); + } + + public static JsonSerde serde(boolean isKey) + { + JsonSerde serde = new JsonSerde<>(); + serde.configure( + Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return serde; + } + + private static String typeMappingsConfig() + { + return typeMappingsConfig(Word.class, WordCounter.class); + } + + public static String typeMappingsConfig(Class wordClass, Class wordCounterClass) + { + return Map.of( + "word", wordClass, + "counter", wordCounterClass) + .entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) + .collect(Collectors.joining(",")); + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java new file mode 100644 index 0000000..e38bcba --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java index 4aa5ee2..77287d5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java @@ -1,13 +1,9 @@ package de.juplo.kafka.wordcount.counter; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; -@AllArgsConstructor(staticName = "of") -@NoArgsConstructor @Data @JsonIgnoreProperties(ignoreUnknown = true) public class Word diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java deleted file mode 100644 index 0767dd1..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCount.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class WordCount -{ - String user; - String word; - long count; - - public static WordCount of(Word word, long count) - { - return new WordCount(word.getUser(), word.getWord(), count); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java new file mode 100644 index 0000000..f1fce71 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class WordCounter +{ + String user; + String word; + long counter; + + public static WordCounter of(Word word, long counter) + { + return new WordCounter(word.getUser(), word.getWord(), counter); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index b412fe4..0faa2de 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,84 +1,149 @@ package de.juplo.kafka.wordcount.counter; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.LinkedList; -import java.util.List; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*; -import static org.awaitility.Awaitility.*; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.main.allow-bean-definition-overriding=true", + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=0", - "juplo.wordcount.counter.cacheMaxBytes=0", + "juplo.wordcount.counter.commit-interval=100", + "juplo.wordcount.counter.cache-max-bytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @Slf4j public class CounterApplicationIT { - public final static String TOPIC_IN = "in"; - public final static String TOPIC_OUT = "out"; - static final int PARTITIONS = 2; + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); + } + @DisplayName("Await the expected output messages") @Test void testSendMessage() { - TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value)); - - await("Expexted converted data") + await("Expected messages") .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); } - @RequiredArgsConstructor static class Consumer { - private final List received = new LinkedList<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive(ConsumerRecord record) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, + @Payload TestOutputWordCounter counter) { - log.debug("Received message: {}", record); - received.add(Message.of(record.key(),record.value())); + log.debug("Received message: {} -> {}", word, counter); + received.add(word, counter); } - synchronized List getReceivedMessages() + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) { - return received; + assertion.accept(received); } } @@ -91,11 +156,10 @@ public class CounterApplicationIT return new Consumer(); } - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier storeSupplier() { - return Stores.inMemoryKeyValueStore("TEST-STORE"); + return Stores.inMemoryKeyValueStore(STORE_NAME); } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index c2ada6f..cfb6bd8 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -1,54 +1,111 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.serialization.*; -import org.apache.kafka.streams.*; +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; -import java.util.List; -import java.util.Properties; +import java.util.Map; +import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; + +@Slf4j public class CounterStreamProcessorTopologyTest { - public final static String IN = "TEST-IN"; - public final static String OUT = "TEST-OUT"; + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; - @Test - public void test() + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUpTestDriver() { Topology topology = CounterStreamProcessor.buildTopology( IN, OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"), - new ObjectMapper()); + Stores.inMemoryKeyValueStore(STORE_NAME)); - CounterApplicationConfiguriation config = - new CounterApplicationConfiguriation(); - Properties properties = - config.propertyMap(new CounterApplicationProperties()); + testDriver = new TopologyTestDriver(topology, serializationConfig()); - TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + in = testDriver.createInputTopic(IN, serializer(), serializer()); + out = testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); + } - TestInputTopic in = testDriver.createInputTopic( - IN, - new StringSerializer(), - new StringSerializer()); - TestOutputTopic out = testDriver.createOutputTopic( - OUT, - new StringDeserializer(), - new StringDeserializer()); - - TestData.writeInputData((key, value) -> in.pipeInput(key, value)); + @Test + public void test() + { + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); - List receivedMessages = out + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out .readRecordsToList() - .stream() - .map(record -> Message.of(record.key(), record.value())) - .toList(); + .forEach(record -> receivedMessages.add(record.key(), record.value())); + + TestData.assertExpectedMessages(receivedMessages); + + TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + TestData.assertExpectedLastMessagesForWord(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } - TestData.assertExpectedResult(receivedMessages); + @AfterEach + public void tearDown() + { + testDriver.close(); + } + + + private static JsonSerializer serializer() + { + return new JsonSerializer().noTypeInfo(); + } + + private JsonDeserializer keyDeserializer() + { + return deserializer(true); + } + + private static JsonDeserializer valueDeserializer() + { + return deserializer(false); + } + + private static JsonDeserializer deserializer(boolean isKey) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return deserializer; + } + + private static String typeMappingsConfig() + { + return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java b/src/test/java/de/juplo/kafka/wordcount/counter/Message.java deleted file mode 100644 index 8ed7cc5..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/Message.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Message -{ - String key; - String value; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 8ff7022..862eb2b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -1,105 +1,206 @@ package de.juplo.kafka.wordcount.counter; -import java.util.List; -import java.util.function.BiConsumer; +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; class TestData { - static void writeInputData(BiConsumer consumer) + static final String PETER = "peter"; + static final String KLAUS = "klaus"; + + static final String WORD_HALLO = "Hallo"; + static final String WORD_MÜSCH = "Müsch"; + static final String WORD_WELT = "Welt"; + static final String WORD_S = "s"; + static final String WORD_BOÄH = "Boäh"; + + static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); + static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); + static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); + static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); + static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_HALLO)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_MÜSCH)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_WELT)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_MÜSCH)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_WELT)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + }; + + static Stream> getInputMessages() + { + return Stream.of(TestData.INPUT_MESSAGES); + } + + static void assertExpectedMessages(MultiValueMap receivedMessages) { - consumer.accept( - "peter", - "{\"user\":\"peter\",\"word\":\"Hallo\"}"); - consumer.accept( - "klaus", - "{\"user\":\"klaus\",\"word\":\"Müsch\"}"); - consumer.accept( - "peter", - "{\"user\":\"peter\",\"word\":\"Welt\"}"); - consumer.accept( - "klaus", - "{\"user\":\"klaus\",\"word\":\"Müsch\"}"); - consumer.accept( - "klaus", - "{\"user\":\"klaus\",\"word\":\"s\"}"); - consumer.accept( - "peter", - "{\"user\":\"peter\",\"word\":\"Boäh\"}"); - consumer.accept( - "peter", - "{\"user\":\"peter\",\"word\":\"Welt\"}"); - consumer.accept( - "peter", - "{\"user\":\"peter\",\"word\":\"Boäh\"}"); - consumer.accept( - "klaus", - "{\"user\":\"klaus\",\"word\":\"s\"}"); - consumer.accept( - "peter", - "{\"user\":\"peter\",\"word\":\"Boäh\"}"); - consumer.accept( - "klaus", - "{\"user\":\"klaus\",\"word\":\"s\"}"); + expectedMessages().forEach( + (word, counter) -> + assertThat(receivedMessages.get(word)) + .containsExactlyElementsOf(counter)); } - static void assertExpectedResult(List receivedMessages) + static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) { - assertThat(receivedMessages).hasSize(11); - assertThat(receivedMessages).containsSubsequence( - expectedMessages[0]); // Hallo - assertThat(receivedMessages).containsSubsequence( - expectedMessages[1], - expectedMessages[3]); // Müsch - assertThat(receivedMessages).containsSubsequence( - expectedMessages[2], - expectedMessages[6]); - assertThat(receivedMessages).containsSubsequence( - expectedMessages[4], - expectedMessages[8], - expectedMessages[10]); // s - assertThat(receivedMessages).containsSubsequence( - expectedMessages[5], - expectedMessages[7], - expectedMessages[9]); // Boäh + assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); + assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); + assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); } - static Message[] expectedMessages = + private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) { - Message.of( - "{\"user\":\"peter\",\"word\":\"Hallo\"}", - "{\"user\":\"peter\",\"word\":\"Hallo\",\"count\":1}"), - Message.of( - "{\"user\":\"klaus\",\"word\":\"Müsch\"}", - "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":1}"), - Message.of( - "{\"user\":\"peter\",\"word\":\"Welt\"}", - "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":1}"), - Message.of( - "{\"user\":\"klaus\",\"word\":\"Müsch\"}", - "{\"user\":\"klaus\",\"word\":\"Müsch\",\"count\":2}"), - Message.of( - "{\"user\":\"klaus\",\"word\":\"s\"}", - "{\"user\":\"klaus\",\"word\":\"s\",\"count\":1}"), - Message.of( - "{\"user\":\"peter\",\"word\":\"Boäh\"}", - "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":1}"), - Message.of( - "{\"user\":\"peter\",\"word\":\"Welt\"}", - "{\"user\":\"peter\",\"word\":\"Welt\",\"count\":2}"), - Message.of( - "{\"user\":\"peter\",\"word\":\"Boäh\"}", - "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":2}"), - Message.of( - "{\"user\":\"klaus\",\"word\":\"s\"}", - "{\"user\":\"klaus\",\"word\":\"s\",\"count\":2}"), - Message.of( - "{\"user\":\"peter\",\"word\":\"Boäh\"}", - "{\"user\":\"peter\",\"word\":\"Boäh\",\"count\":3}"), - Message.of( - "{\"user\":\"klaus\",\"word\":\"s\"}", - "{\"user\":\"klaus\",\"word\":\"s\",\"count\":3}"), + return messagesForUsers.get(word) == null + ? 0 + : messagesForUsers.get(word).size(); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + } + + private static Word wordOf(TestOutputWord testOutputWord) + { + Word word = new Word(); + + word.setUser(testOutputWord.getUser()); + word.setWord(testOutputWord.getWord()); + + return word; + } + + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + Long counter) + { + TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( + word.getUser(), + word.getWord(), + counter); + assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + TestOutputWordCounter counter) + { + assertThat(counter).isEqualTo(getLastMessageFor(word)); + } + + private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) + { + return getLastMessageFor(word, expectedMessages()); + } + + private static TestOutputWordCounter getLastMessageFor( + TestOutputWord user, + MultiValueMap messagesForWord) + { + return messagesForWord + .get(user) + .stream() + .reduce(null, (left, right) -> right); + } + + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER_HALLO, + TestOutputWordCounter.of(PETER, WORD_HALLO,1)), + KeyValue.pair( + KLAUS_MÜSCH, + TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), + KeyValue.pair( + PETER_WELT, + TestOutputWordCounter.of(PETER, WORD_WELT,1)), + KeyValue.pair( + KLAUS_MÜSCH, + TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,1)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), + KeyValue.pair( + PETER_WELT, + TestOutputWordCounter.of(PETER, WORD_WELT,2)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,2)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,3)), }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java new file mode 100644 index 0000000..2255b61 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java new file mode 100644 index 0000000..71ed1d9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java new file mode 100644 index 0000000..cfc2cae --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java new file mode 100644 index 0000000..1b59387 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWordCounter +{ + String user; + String word; + long counter; +}