From: Kai Moritz Date: Sat, 15 Jun 2024 21:19:02 +0000 (+0200) Subject: popular: 1.0.0 - Renamed packages and classes -- MOVE X-Git-Tag: popular-on-counter~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=220bc2a369a7edbafd2473194b2ce4ed2b6a0b69;p=demos%2Fkafka%2Fwordcount popular: 1.0.0 - Renamed packages and classes -- MOVE --- diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java deleted file mode 100644 index e6d3b1f..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class CounterApplication -{ - public static void main(String[] args) - { - SpringApplication.run(CounterApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java deleted file mode 100644 index 484b8de..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ /dev/null @@ -1,97 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(CounterApplicationProperties.class) -@Slf4j -public class CounterApplicationConfiguriation -{ - @Bean - public Properties streamProcessorProperties( - CounterApplicationProperties counterProperties) - { - Properties propertyMap = serializationConfig(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); - - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); - if (counterProperties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); - if (counterProperties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); - - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return propertyMap; - } - - static Properties serializationConfig() - { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); - propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); - propertyMap.put( - JsonDeserializer.TYPE_MAPPINGS, - "user:" + User.class.getName() + "," + - "word:" + Word.class.getName() + "," + - "counter:" + WordCounter.class.getName()); - - return propertyMap; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties applicationProperties, - Properties streamProcessorProperties, - KeyValueBytesStoreSupplier storeSupplier, - ConfigurableApplicationContext context) - { - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - applicationProperties.getInputTopic(), - applicationProperties.getOutputTopic(), - streamProcessorProperties, - storeSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.persistentKeyValueStore(STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java deleted file mode 100644 index c3ada17..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.counter") -@Getter -@Setter -@ToString -public class CounterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; - private String inputTopic = "words"; - private String outputTopic = "countings"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java deleted file mode 100644 index 64bd619..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ /dev/null @@ -1,80 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.util.Properties; - - -@Slf4j -public class CounterStreamProcessor -{ - public static final String STORE_NAME = "counter"; - - - public final KafkaStreams streams; - - - public CounterStreamProcessor( - String inputTopic, - String outputTopic, - Properties properties, - KeyValueBytesStoreSupplier storeSupplier) - { - Topology topology = CounterStreamProcessor.buildTopology( - inputTopic, - outputTopic, - storeSupplier); - - streams = new KafkaStreams(topology, properties); - } - - static Topology buildTopology( - String inputTopic, - String outputTopic, - KeyValueBytesStoreSupplier storeSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - KStream source = builder.stream(inputTopic); - - source - .map((key, word) -> new KeyValue<>(word, word)) - .groupByKey() - .count( - Materialized - .as(storeSupplier) - .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys())) - .toStream() - .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) - .to(outputTopic); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); - } - - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java deleted file mode 100644 index e38bcba..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/User.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class User -{ - String user; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java deleted file mode 100644 index 77287d5..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class Word -{ - private String user; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java deleted file mode 100644 index f1fce71..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(access = AccessLevel.PRIVATE) -public class WordCounter -{ - String user; - String word; - long counter; - - public static WordCounter of(Word word, long counter) - { - return new WordCounter(word.getUser(), word.getWord(), counter); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java new file mode 100644 index 0000000..e6d3b1f --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplication.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.counter; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class CounterApplication +{ + public static void main(String[] args) + { + SpringApplication.run(CounterApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java new file mode 100644 index 0000000..484b8de --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java @@ -0,0 +1,97 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; + +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Configuration +@EnableConfigurationProperties(CounterApplicationProperties.class) +@Slf4j +public class CounterApplicationConfiguriation +{ + @Bean + public Properties streamProcessorProperties( + CounterApplicationProperties counterProperties) + { + Properties propertyMap = serializationConfig(); + + propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); + + propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); + if (counterProperties.getCommitInterval() != null) + propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); + if (counterProperties.getCacheMaxBytes() != null) + propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); + + propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + return propertyMap; + } + + static Properties serializationConfig() + { + Properties propertyMap = new Properties(); + + propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + propertyMap.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + propertyMap.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Word.class.getName()); + propertyMap.put( + JsonDeserializer.TYPE_MAPPINGS, + "user:" + User.class.getName() + "," + + "word:" + Word.class.getName() + "," + + "counter:" + WordCounter.class.getName()); + + return propertyMap; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public CounterStreamProcessor streamProcessor( + CounterApplicationProperties applicationProperties, + Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, + ConfigurableApplicationContext context) + { + CounterStreamProcessor streamProcessor = new CounterStreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + streamProcessorProperties, + storeSupplier); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + + + return streamProcessor; + } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore(STORE_NAME); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java new file mode 100644 index 0000000..c3ada17 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationProperties.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.counter; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.counter") +@Getter +@Setter +@ToString +public class CounterApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "counter"; + private String inputTopic = "words"; + private String outputTopic = "countings"; + private Integer commitInterval; + private Integer cacheMaxBytes; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java new file mode 100644 index 0000000..64bd619 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java @@ -0,0 +1,80 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.kafka.support.serializer.JsonSerde; + +import java.util.Properties; + + +@Slf4j +public class CounterStreamProcessor +{ + public static final String STORE_NAME = "counter"; + + + public final KafkaStreams streams; + + + public CounterStreamProcessor( + String inputTopic, + String outputTopic, + Properties properties, + KeyValueBytesStoreSupplier storeSupplier) + { + Topology topology = CounterStreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier); + + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) + { + StreamsBuilder builder = new StreamsBuilder(); + + KStream source = builder.stream(inputTopic); + + source + .map((key, word) -> new KeyValue<>(word, word)) + .groupByKey() + .count( + Materialized + .as(storeSupplier) + .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys())) + .toStream() + .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) + .to(outputTopic); + + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; + } + + ReadOnlyKeyValueStore getStore() + { + return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + } + + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/User.java b/src/main/java/de/juplo/kafka/wordcount/popular/User.java new file mode 100644 index 0000000..e38bcba --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/User.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class User +{ + String user; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/Word.java b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java new file mode 100644 index 0000000..77287d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/Word.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class Word +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java new file mode 100644 index 0000000..f1fce71 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java @@ -0,0 +1,22 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class WordCounter +{ + String user; + String word; + long counter; + + public static WordCounter of(Word word, long counter) + { + return new WordCounter(word.getUser(), word.getWord(), counter); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java deleted file mode 100644 index 334cd05..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ /dev/null @@ -1,165 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; -import static org.awaitility.Awaitility.await; - - -@SpringBootTest( - properties = { - "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=0", - "juplo.wordcount.counter.input-topic=" + TOPIC_IN, - "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) -@Slf4j -public class CounterApplicationIT -{ - public static final String TOPIC_IN = "in"; - public static final String TOPIC_OUT = "out"; - - @Autowired - Consumer consumer; - @Autowired - CounterStreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) - { - TestData - .getInputMessages() - .forEach(kv -> - { - try - { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - }); - } - - @DisplayName("Await the expected number of messages") - @Test - public void testAwaitExpectedNumberOfMessagesForUsers() - { - await("Expected number of messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); - } - - @DisplayName("Await the expected output messages") - @Test - void testSendMessage() - { - await("Expected messages") - .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); - } - - @DisplayName("Await the expected final output messages") - @Test - public void testAwaitExpectedLastMessagesForUsers() - { - await("Expected final output messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); - } - - @DisplayName("Await the expected state in the state-store") - @Test - public void testAwaitExpectedState() - { - await("Expected state") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); - } - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, - @Payload TestOutputWordCounter counter) - { - log.debug("Received message: {} -> {}", word, counter); - received.add(word, counter); - } - - synchronized void enforceAssertion( - java.util.function.Consumer> assertion) - { - assertion.accept(received); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - - @Primary - @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() - { - return Stores.inMemoryKeyValueStore(STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java deleted file mode 100644 index 0ffd516..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; - - -@Slf4j -public class CounterStreamProcessorTopologyTest -{ - public static final String IN = "TEST-IN"; - public static final String OUT = "TEST-OUT"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; - - - TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; - - - @BeforeEach - public void setUpTestDriver() - { - Topology topology = CounterStreamProcessor.buildTopology( - IN, - OUT, - Stores.inMemoryKeyValueStore(STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - in = testDriver.createInputTopic( - IN, - new JsonSerializer().noTypeInfo(), - new JsonSerializer().noTypeInfo()); - - out = testDriver.createOutputTopic( - OUT, - new JsonDeserializer() - .copyWithType(TestOutputWord.class) - .ignoreTypeHeaders(), - new JsonDeserializer() - .copyWithType(TestOutputWordCounter.class) - .ignoreTypeHeaders()); - } - - - @Test - public void test() - { - TestData - .getInputMessages() - .forEach(kv -> in.pipeInput(kv.key, kv.value)); - - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); - out - .readRecordsToList() - .forEach(record -> receivedMessages.add(record.key(), record.value())); - - TestData.assertExpectedMessages(receivedMessages); - - TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); - TestData.assertExpectedLastMessagesForWord(receivedMessages); - - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); - } - - @AfterEach - public void tearDown() - { - testDriver.close(); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java deleted file mode 100644 index 1ecfdbd..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ /dev/null @@ -1,206 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - - -class TestData -{ - static final String PETER = "peter"; - static final String KLAUS = "klaus"; - - static final String WORD_HALLO = "Hallo"; - static final String WORD_MÜSCH = "Müsch"; - static final String WORD_WELT = "Welt"; - static final String WORD_S = "s"; - static final String WORD_BOÄH = "Boäh"; - - static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); - static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); - static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); - static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); - static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); - - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] - { - new KeyValue<>( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_HALLO)), - new KeyValue<>( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), - new KeyValue<>( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), - new KeyValue<>( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), - new KeyValue<>( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - new KeyValue<>( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), - new KeyValue<>( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - new KeyValue<>( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - new KeyValue<>( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - }; - - static Stream> getInputMessages() - { - return Stream.of(TestData.INPUT_MESSAGES); - } - - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - expectedMessages().forEach( - (word, counter) -> - assertThat(receivedMessages.get(word)) - .containsExactlyElementsOf(counter)); - } - - static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) - { - assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); - assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); - assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); - } - - private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) - { - return messagesForUsers.get(word) == null - ? 0 - : messagesForUsers.get(word).size(); - } - - static void assertExpectedState(ReadOnlyKeyValueStore store) - { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); - } - - private static Word wordOf(TestOutputWord testOutputWord) - { - Word word = new Word(); - - word.setUser(testOutputWord.getUser()); - word.setWord(testOutputWord.getWord()); - - return word; - } - - static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) - { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); - } - - private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - Long counter) - { - TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( - word.getUser(), - word.getWord(), - counter); - assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); - } - - private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - TestOutputWordCounter counter) - { - assertThat(counter).isEqualTo(getLastMessageFor(word)); - } - - private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) - { - return getLastMessageFor(word, expectedMessages()); - } - - private static TestOutputWordCounter getLastMessageFor( - TestOutputWord user, - MultiValueMap messagesForWord) - { - return messagesForWord - .get(user) - .stream() - .reduce(null, (left, right) -> right); - } - - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] - { - KeyValue.pair( - PETER_HALLO, - TestOutputWordCounter.of(PETER, WORD_HALLO,1)), - KeyValue.pair( - KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), - KeyValue.pair( - PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,1)), - KeyValue.pair( - KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,1)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), - KeyValue.pair( - PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,2)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,2)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,3)), - }; - - static MultiValueMap expectedMessages() - { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); - Stream - .of(EXPECTED_MESSAGES) - .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); - return expectedMessages; - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java new file mode 100644 index 0000000..334cd05 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java @@ -0,0 +1,165 @@ +package de.juplo.kafka.wordcount.counter; + +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; + +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.counter.commit-interval=0", + "juplo.wordcount.counter.input-topic=" + TOPIC_IN, + "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) +@Slf4j +public class CounterApplicationIT +{ + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; + + @Autowired + Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected output messages") + @Test + void testSendMessage() + { + await("Expected messages") + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, + @Payload TestOutputWordCounter counter) + { + log.debug("Received message: {} -> {}", word, counter); + received.add(word, counter); + } + + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) + { + assertion.accept(received); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java new file mode 100644 index 0000000..0ffd516 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java @@ -0,0 +1,90 @@ +package de.juplo.kafka.wordcount.counter; + +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; + + +@Slf4j +public class CounterStreamProcessorTopologyTest +{ + public static final String IN = "TEST-IN"; + public static final String OUT = "TEST-OUT"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; + + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUpTestDriver() + { + Topology topology = CounterStreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + in = testDriver.createInputTopic( + IN, + new JsonSerializer().noTypeInfo(), + new JsonSerializer().noTypeInfo()); + + out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer() + .copyWithType(TestOutputWord.class) + .ignoreTypeHeaders(), + new JsonDeserializer() + .copyWithType(TestOutputWordCounter.class) + .ignoreTypeHeaders()); + } + + + @Test + public void test() + { + TestData + .getInputMessages() + .forEach(kv -> in.pipeInput(kv.key, kv.value)); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> receivedMessages.add(record.key(), record.value())); + + TestData.assertExpectedMessages(receivedMessages); + + TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); + TestData.assertExpectedLastMessagesForWord(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java new file mode 100644 index 0000000..1ecfdbd --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/popular/TestData.java @@ -0,0 +1,206 @@ +package de.juplo.kafka.wordcount.counter; + +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final String PETER = "peter"; + static final String KLAUS = "klaus"; + + static final String WORD_HALLO = "Hallo"; + static final String WORD_MÜSCH = "Müsch"; + static final String WORD_WELT = "Welt"; + static final String WORD_S = "s"; + static final String WORD_BOÄH = "Boäh"; + + static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); + static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); + static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); + static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); + static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_HALLO)), + new KeyValue<>( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_MÜSCH)), + new KeyValue<>( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_WELT)), + new KeyValue<>( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_MÜSCH)), + new KeyValue<>( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + new KeyValue<>( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + new KeyValue<>( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_WELT)), + new KeyValue<>( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + new KeyValue<>( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + new KeyValue<>( + TestInputUser.of(PETER), + TestInputWord.of(PETER, WORD_BOÄH)), + new KeyValue<>( + TestInputUser.of(KLAUS), + TestInputWord.of(KLAUS, WORD_S)), + }; + + static Stream> getInputMessages() + { + return Stream.of(TestData.INPUT_MESSAGES); + } + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + expectedMessages().forEach( + (word, counter) -> + assertThat(receivedMessages.get(word)) + .containsExactlyElementsOf(counter)); + } + + static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) + { + assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); + assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); + assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); + assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); + } + + private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) + { + return messagesForUsers.get(word) == null + ? 0 + : messagesForUsers.get(word).size(); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); + } + + private static Word wordOf(TestOutputWord testOutputWord) + { + Word word = new Word(); + + word.setUser(testOutputWord.getUser()); + word.setWord(testOutputWord.getWord()); + + return word; + } + + static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) + { + assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); + assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + Long counter) + { + TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( + word.getUser(), + word.getWord(), + counter); + assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); + } + + private static void assertWordCountEqualsWordCountFromLastMessage( + TestOutputWord word, + TestOutputWordCounter counter) + { + assertThat(counter).isEqualTo(getLastMessageFor(word)); + } + + private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) + { + return getLastMessageFor(word, expectedMessages()); + } + + private static TestOutputWordCounter getLastMessageFor( + TestOutputWord user, + MultiValueMap messagesForWord) + { + return messagesForWord + .get(user) + .stream() + .reduce(null, (left, right) -> right); + } + + private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER_HALLO, + TestOutputWordCounter.of(PETER, WORD_HALLO,1)), + KeyValue.pair( + KLAUS_MÜSCH, + TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), + KeyValue.pair( + PETER_WELT, + TestOutputWordCounter.of(PETER, WORD_WELT,1)), + KeyValue.pair( + KLAUS_MÜSCH, + TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,1)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), + KeyValue.pair( + PETER_WELT, + TestOutputWordCounter.of(PETER, WORD_WELT,2)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,2)), + KeyValue.pair( + PETER_BOÄH, + TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), + KeyValue.pair( + KLAUS_S, + TestOutputWordCounter.of(KLAUS, WORD_S,3)), + }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java deleted file mode 100644 index cfc2cae..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestOutputWord -{ - String user; - String word; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java deleted file mode 100644 index 1b59387..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestOutputWordCounter -{ - String user; - String word; - long counter; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java new file mode 100644 index 0000000..cfc2cae --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWord.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWord +{ + String user; + String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java new file mode 100644 index 0000000..1b59387 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/topwords/TestOutputWordCounter.java @@ -0,0 +1,16 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestOutputWordCounter +{ + String user; + String word; + long counter; +}