From: Kai Moritz Date: Sat, 15 Jun 2024 21:19:02 +0000 (+0200) Subject: popular: 1.0.0 - Renamed packages and classes -- MOVE X-Git-Tag: popular-1.0.0~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=f3d23c150a9130346e850b29bdca4c04ef6e81c6;p=demos%2Fkafka%2Fwordcount popular: 1.0.0 - Renamed packages and classes -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java deleted file mode 100644 index e6d3b1f..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class CounterApplication -{ - public static void main(String[] args) - { - SpringApplication.run(CounterApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java deleted file mode 100644 index 174521f..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ /dev/null @@ -1,91 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(CounterApplicationProperties.class) -@Slf4j -public class CounterApplicationConfiguriation -{ - @Bean - public Properties streamProcessorProperties( - CounterApplicationProperties counterProperties) - { - Properties propertyMap = serializationConfig(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); - - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); - if (counterProperties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); - if (counterProperties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); - - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return propertyMap; - } - - static Properties serializationConfig() - { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); - - return propertyMap; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties applicationProperties, - Properties streamProcessorProperties, - KeyValueBytesStoreSupplier storeSupplier, - ConfigurableApplicationContext context) - { - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - applicationProperties.getInputTopic(), - applicationProperties.getOutputTopic(), - streamProcessorProperties, - storeSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.persistentKeyValueStore(STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java deleted file mode 100644 index c3ada17..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.counter") -@Getter -@Setter -@ToString -public class CounterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; - private String inputTopic = "words"; - private String outputTopic = "countings"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java deleted file mode 100644 index 2304e55..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ /dev/null @@ -1,130 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - - -@Slf4j -public class CounterStreamProcessor -{ - public static final String STORE_NAME = "counter"; - - - public final KafkaStreams streams; - - - public CounterStreamProcessor( - String inputTopic, - String outputTopic, - Properties properties, - KeyValueBytesStoreSupplier storeSupplier) - { - Topology topology = CounterStreamProcessor.buildTopology( - inputTopic, - outputTopic, - storeSupplier); - - streams = new KafkaStreams(topology, properties); - } - - static Topology buildTopology( - String inputTopic, - String outputTopic, - KeyValueBytesStoreSupplier storeSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - builder - .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) - .map((key, word) -> new KeyValue<>(word, word)) - .groupByKey() - .count( - Materialized - .as(storeSupplier) - .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed! - .toStream() - .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) - .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); - } - - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } - - - - public static JsonSerde inKeySerde() - { - return new JsonSerde<>(User.class); - } - - public static JsonSerde inValueSerde() - { - return new JsonSerde<>(Word.class); - } - - public static JsonSerde outKeySerde() - { - return serde(true); - } - - public static JsonSerde outValueSerde() - { - return serde(false); - } - - public static JsonSerde serde(boolean isKey) - { - JsonSerde serde = new JsonSerde<>(); - serde.configure( - Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()), - isKey); - return serde; - } - - private static String typeMappingsConfig() - { - return typeMappingsConfig(Word.class, WordCounter.class); - } - - public static String typeMappingsConfig(Class wordClass, Class wordCounterClass) - { - return Map.of( - "word", wordClass, - "counter", wordCounterClass) - .entrySet() - .stream() - .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) - .collect(Collectors.joining(",")); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java deleted file mode 100644 index e38bcba..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/User.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class User -{ - String user; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java deleted file mode 100644 index 77287d5..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class Word -{ - private String user; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java deleted file mode 100644 index f1fce71..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(access = AccessLevel.PRIVATE) -public class WordCounter -{ - String user; - String word; - long counter; - - public static WordCounter of(Word word, long counter) - { - return new WordCounter(word.getUser(), word.getWord(), counter); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java new file mode 100644 index 0000000..e6d3b1f --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.counter; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class CounterApplication +{ + public static void main(String[] args) + { + SpringApplication.run(CounterApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java new file mode 100644 index 0000000..174521f --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java @@ -0,0 +1,91 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(CounterApplicationProperties.class) +@Slf4j +public class CounterApplicationConfiguriation +{ + @Bean + public Properties streamProcessorProperties( + CounterApplicationProperties counterProperties) + { + Properties propertyMap = serializationConfig(); + + propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); + + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); + if (counterProperties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); + if (counterProperties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); + + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return propertyMap; + } + + static Properties serializationConfig() + { + Properties propertyMap = new Properties(); + + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); + + return propertyMap; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public CounterStreamProcessor streamProcessor( + CounterApplicationProperties applicationProperties, + Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, + ConfigurableApplicationContext context) + { + CounterStreamProcessor streamProcessor = new CounterStreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + streamProcessorProperties, + storeSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore(STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java new file mode 100644 index 0000000..c3ada17 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.counter; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.counter") +@Getter +@Setter +@ToString +public class CounterApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "counter"; + private String inputTopic = "words"; + private String outputTopic = "countings"; + private Integer commitInterval; + private Integer cacheMaxBytes; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java new file mode 100644 index 0000000..2304e55 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -0,0 +1,130 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + + +@Slf4j +public class CounterStreamProcessor +{ + public static final String STORE_NAME = "counter"; + + + public final KafkaStreams streams; + + + public CounterStreamProcessor( + String inputTopic, + String outputTopic, + Properties properties, + KeyValueBytesStoreSupplier storeSupplier) + { + Topology topology = CounterStreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier); + + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) + { + StreamsBuilder builder = new StreamsBuilder(); + + builder + .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) + .map((key, word) -> new KeyValue<>(word, word)) + .groupByKey() + .count( + Materialized + .as(storeSupplier) + .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed! + .toStream() + .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) + .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; + } + + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + } + + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } + + + + public static JsonSerde inKeySerde() + { + return new JsonSerde<>(User.class); + } + + public static JsonSerde inValueSerde() + { + return new JsonSerde<>(Word.class); + } + + public static JsonSerde outKeySerde() + { + return serde(true); + } + + public static JsonSerde outValueSerde() + { + return serde(false); + } + + public static JsonSerde serde(boolean isKey) + { + JsonSerde serde = new JsonSerde<>(); + serde.configure( + Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return serde; + } + + private static String typeMappingsConfig() + { + return typeMappingsConfig(Word.class, WordCounter.class); + } + + public static String typeMappingsConfig(Class wordClass, Class wordCounterClass) + { + return Map.of( + "word", wordClass, + "counter", wordCounterClass) + .entrySet() + .stream() + .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) + .collect(Collectors.joining(",")); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/User.java b/src/main/java/de/juplo/kafka/wordcount/popular/User.java new file mode 100644 index 0000000..e38bcba --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java new file mode 100644 index 0000000..77287d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Word +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java new file mode 100644 index 0000000..f1fce71 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class WordCounter +{ + String user; + String word; + long counter; + + public static WordCounter of(Word word, long counter) + { + return new WordCounter(word.getUser(), word.getWord(), counter); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java deleted file mode 100644 index 0faa2de..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ /dev/null @@ -1,165 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; -import static org.awaitility.Awaitility.await; - - -@SpringBootTest( - properties = { - "spring.main.allow-bean-definition-overriding=true", - "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=100", - "juplo.wordcount.counter.cache-max-bytes=0", - "juplo.wordcount.counter.input-topic=" + TOPIC_IN, - "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) -@Slf4j -public class CounterApplicationIT -{ - public static final String TOPIC_IN = "in"; - public static final String TOPIC_OUT = "out"; - - @Autowired - Consumer consumer; - @Autowired - CounterStreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) - { - TestData - .getInputMessages() - .forEach(kv -> - { - try - { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - }); - } - - @DisplayName("Await the expected number of messages") - @Test - public void testAwaitExpectedNumberOfMessagesForUsers() - { - await("Expected number of messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); - } - - @DisplayName("Await the expected output messages") - @Test - void testSendMessage() - { - await("Expected messages") - .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); - } - - @DisplayName("Await the expected final output messages") - @Test - public void testAwaitExpectedLastMessagesForUsers() - { - await("Expected final output messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); - } - - @DisplayName("Await the expected state in the state-store") - @Test - public void testAwaitExpectedState() - { - await("Expected state") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); - } - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, - @Payload TestOutputWordCounter counter) - { - log.debug("Received message: {} -> {}", word, counter); - received.add(word, counter); - } - - synchronized void enforceAssertion( - java.util.function.Consumer> assertion) - { - assertion.accept(received); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - - @Bean - KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.inMemoryKeyValueStore(STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java deleted file mode 100644 index e80e383..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ /dev/null @@ -1,130 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.Map; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; - - -@Slf4j -public class CounterStreamProcessorTopologyTest -{ - public static final String IN = "TEST-IN"; - public static final String OUT = "TEST-OUT"; - - - static TopologyTestDriver testDriver; - static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); - - - @BeforeAll - public static void setUpTestDriver() - { - Topology topology = CounterStreamProcessor.buildTopology( - IN, - OUT, - Stores.inMemoryKeyValueStore(STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - TestInputTopic in = - testDriver.createInputTopic(IN, serializer(), serializer()); - TestOutputTopic out = - testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); - - TestData - .getInputMessages() - .forEach(kv -> in.pipeInput(kv.key, kv.value)); - - receivedMessages = new LinkedMultiValueMap<>(); - out - .readRecordsToList() - .forEach(record -> receivedMessages.add(record.key(), record.value())); - } - - - @DisplayName("Assert the expected output messages") - @Test - public void testExpectedMessages() - { - TestData.assertExpectedMessages(receivedMessages); - } - - @DisplayName("Assert the expected number of messages") - @Test - public void testExpectedNumberOfMessagesForWord() - { - TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); - } - - @DisplayName("Await the expected final output messages") - @Test - public void testExpectedLastMessagesForWord() - { - TestData.assertExpectedLastMessagesForWord(receivedMessages); - } - - @DisplayName("Assert the expected state in the state-store") - @Test - public void testExpectedState() - { - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); - } - - @AfterAll - public static void tearDown() - { - testDriver.close(); - } - - - private static JsonSerializer serializer() - { - return new JsonSerializer().noTypeInfo(); - } - - private static JsonDeserializer keyDeserializer() - { - return deserializer(true); - } - - private static JsonDeserializer valueDeserializer() - { - return deserializer(false); - } - - private static JsonDeserializer deserializer(boolean isKey) - { - JsonDeserializer deserializer = new JsonDeserializer<>(); - deserializer.configure( - Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()), - isKey); - return deserializer; - } - - private static String typeMappingsConfig() - { - return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java deleted file mode 100644 index 862eb2b..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ /dev/null @@ -1,206 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - - -class TestData -{ - static final String PETER = "peter"; - static final String KLAUS = "klaus"; - - static final String WORD_HALLO = "Hallo"; - static final String WORD_MÜSCH = "Müsch"; - static final String WORD_WELT = "Welt"; - static final String WORD_S = "s"; - static final String WORD_BOÄH = "Boäh"; - - static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); - static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); - static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); - static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); - static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); - - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] - { - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_HALLO)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - }; - - static Stream> getInputMessages() - { - return Stream.of(TestData.INPUT_MESSAGES); - } - - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - expectedMessages().forEach( - (word, counter) -> - assertThat(receivedMessages.get(word)) - .containsExactlyElementsOf(counter)); - } - - static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) - { - assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); - assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); - assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); - } - - private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) - { - return messagesForUsers.get(word) == null - ? 0 - : messagesForUsers.get(word).size(); - } - - static void assertExpectedState(ReadOnlyKeyValueStore store) - { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); - } - - private static Word wordOf(TestOutputWord testOutputWord) - { - Word word = new Word(); - - word.setUser(testOutputWord.getUser()); - word.setWord(testOutputWord.getWord()); - - return word; - } - - static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) - { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); - } - - private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - Long counter) - { - TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( - word.getUser(), - word.getWord(), - counter); - assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); - } - - private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - TestOutputWordCounter counter) - { - assertThat(counter).isEqualTo(getLastMessageFor(word)); - } - - private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) - { - return getLastMessageFor(word, expectedMessages()); - } - - private static TestOutputWordCounter getLastMessageFor( - TestOutputWord user, - MultiValueMap messagesForWord) - { - return messagesForWord - .get(user) - .stream() - .reduce(null, (left, right) -> right); - } - - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] - { - KeyValue.pair( - PETER_HALLO, - TestOutputWordCounter.of(PETER, WORD_HALLO,1)), - KeyValue.pair( - KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), - KeyValue.pair( - PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,1)), - KeyValue.pair( - KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,1)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), - KeyValue.pair( - PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,2)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,2)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,3)), - }; - - static MultiValueMap expectedMessages() - { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); - Stream - .of(EXPECTED_MESSAGES) - .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); - return expectedMessages; - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java new file mode 100644 index 0000000..0faa2de --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -0,0 +1,165 @@ +package de.juplo.kafka.wordcount.counter; + +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; + +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.main.allow-bean-definition-overriding=true", + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.counter.commit-interval=100", + "juplo.wordcount.counter.cache-max-bytes=0", + "juplo.wordcount.counter.input-topic=" + TOPIC_IN, + "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) +@Slf4j +public class CounterApplicationIT +{ + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; + + @Autowired + Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected output messages") + @Test + void testSendMessage() + { + await("Expected messages") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, + @Payload TestOutputWordCounter counter) + { + log.debug("Received message: {} -> {}", word, counter); + received.add(word, counter); + } + + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) + { + assertion.accept(received); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + + @Bean + KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java new file mode 100644 index 0000000..e80e383 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -0,0 +1,130 @@ +package de.juplo.kafka.wordcount.counter; + +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; + +import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; + + +@Slf4j +public class CounterStreamProcessorTopologyTest +{ + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + + + static TopologyTestDriver testDriver; + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + + + @BeforeAll + public static void setUpTestDriver() + { + Topology topology = CounterStreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + TestInputTopic in = + testDriver.createInputTopic(IN, serializer(), serializer()); + TestOutputTopic out = + testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); + + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + } + + + @DisplayName("Assert the expected output messages") + @Test + public void testExpectedMessages() + { + TestData.assertExpectedMessages(receivedMessages); + } + + @DisplayName("Assert the expected number of messages") + @Test + public void testExpectedNumberOfMessagesForWord() + { + TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testExpectedLastMessagesForWord() + { + TestData.assertExpectedLastMessagesForWord(receivedMessages); + } + + @DisplayName("Assert the expected state in the state-store") + @Test + public void testExpectedState() + { + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterAll + public static void tearDown() + { + testDriver.close(); + } + + + private static JsonSerializer serializer() + { + return new JsonSerializer().noTypeInfo(); + } + + private static JsonDeserializer keyDeserializer() + { + return deserializer(true); + } + + private static JsonDeserializer valueDeserializer() + { + return deserializer(false); + } + + private static JsonDeserializer deserializer(boolean isKey) + { + JsonDeserializer deserializer = new JsonDeserializer<>(); + deserializer.configure( + Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()), + isKey); + return deserializer; + } + + private static String typeMappingsConfig() + { + return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java new file mode 100644 index 0000000..862eb2b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -0,0 +1,206 @@ +package de.juplo.kafka.wordcount.counter; + +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final String PETER = "peter"; + static final String KLAUS = "klaus"; + + static final String WORD_HALLO = "Hallo"; + static final String WORD_MÜSCH = "Müsch"; + static final String WORD_WELT = "Welt"; + static final String WORD_S = "s"; + static final String WORD_BOÄH = "Boäh"; + + static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); + static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); + static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); + static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); + static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_HALLO)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_MÜSCH)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_WELT)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_MÜSCH)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_WELT)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + KeyValue.pair( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + KeyValue.pair( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + }; + + static Stream> getInputMessages() + { + return Stream.of(TestData.INPUT_MESSAGES); + } + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + expectedMessages().forEach( + (word, counter) -> + assertThat(receivedMessages.get(word)) + .containsExactlyElementsOf(counter)); + } + + static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) + { + assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); + assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); + assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); + } + + private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) + { + return messagesForUsers.get(word) == null + ? 0 + : messagesForUsers.get(word).size(); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + } + + private static Word wordOf(TestOutputWord testOutputWord) + { + Word word = new Word(); + + word.setUser(testOutputWord.getUser()); + word.setWord(testOutputWord.getWord()); + + return word; + } + + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + Long counter) + { + TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( + word.getUser(), + word.getWord(), + counter); + assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + TestOutputWordCounter counter) + { + assertThat(counter).isEqualTo(getLastMessageFor(word)); + } + + private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) + { + return getLastMessageFor(word, expectedMessages()); + } + + private static TestOutputWordCounter getLastMessageFor( + TestOutputWord user, + MultiValueMap messagesForWord) + { + return messagesForWord + .get(user) + .stream() + .reduce(null, (left, right) -> right); + } + + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER_HALLO, + TestOutputWordCounter.of(PETER, WORD_HALLO,1)), + KeyValue.pair( + KLAUS_MÜSCH, + TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), + KeyValue.pair( + PETER_WELT, + TestOutputWordCounter.of(PETER, WORD_WELT,1)), + KeyValue.pair( + KLAUS_MÜSCH, + TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,1)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), + KeyValue.pair( + PETER_WELT, + TestOutputWordCounter.of(PETER, WORD_WELT,2)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,2)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,3)), + }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java new file mode 100644 index 0000000..2255b61 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/InputUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputUser +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java new file mode 100644 index 0000000..71ed1d9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/InputWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestInputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java deleted file mode 100644 index 2255b61..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestInputUser -{ - String user; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java deleted file mode 100644 index 71ed1d9..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestInputWord -{ - String user; - String word; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java new file mode 100644 index 0000000..cfc2cae --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java new file mode 100644 index 0000000..1b59387 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWordCounter +{ + String user; + String word; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java deleted file mode 100644 index cfc2cae..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestOutputWord -{ - String user; - String word; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java deleted file mode 100644 index 1b59387..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestOutputWordCounter -{ - String user; - String word; - long counter; -}