From 2532b8ee83ae921d7c5318def8813d91a0814951 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 5 Jul 2024 22:44:23 +0200 Subject: [PATCH] Separated Example-Tests from `counter` --- pom.xml | 78 ++----- .../wordcount/counter/CounterApplication.java | 14 -- .../CounterApplicationConfiguriation.java | 91 -------- .../counter/CounterApplicationProperties.java | 22 -- .../counter/CounterStreamProcessor.java | 132 ------------ .../juplo/kafka/wordcount/counter/User.java | 12 -- .../kafka/wordcount/counter/UserWord.java | 13 -- .../juplo/kafka/wordcount/counter/Word.java | 16 -- .../kafka/wordcount/counter/WordCounter.java | 22 -- src/main/resources/application.yml | 7 - .../counter/CounterApplicationIT.java | 165 -------------- .../CounterStreamProcessorTopologyTest.java | 130 ----------- .../kafka/wordcount/counter/TestData.java | 204 ------------------ .../wordcount/splitter/TestInputUser.java | 14 -- .../wordcount/splitter/TestInputWord.java | 15 -- .../kafka/wordcount/top10/TestOutputWord.java | 16 -- .../top10/TestOutputWordCounter.java | 16 -- 17 files changed, 17 insertions(+), 950 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/User.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/Word.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java delete mode 100644 src/main/resources/application.yml delete mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java delete mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java delete mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/TestData.java delete mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java delete mode 100644 src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java delete mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java delete mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java diff --git a/pom.xml b/pom.xml index 013bb5c..091cc27 100644 --- a/pom.xml +++ b/pom.xml @@ -1,57 +1,45 @@ + 4.0.0 + org.springframework.boot spring-boot-starter-parent 3.2.7 - de.juplo.kafka.wordcount - counter - 1.4.2 - Wordcount-Counter - Word-counting stream-processor of the multi-user wordcount-example + + de.juplo.kafka.streams.demos + examples + 1.0.0-SNAPSHOT + + Examples + Examples for Kafka Streams + 21 0.44.0 + - - org.springframework.boot - spring-boot-starter-actuator - - - org.springframework.boot - spring-boot-starter-web - + org.apache.kafka kafka-streams - - org.springframework.kafka - spring-kafka - - - - org.springframework.boot - spring-boot-devtools - runtime - true - - - org.springframework.boot - spring-boot-configuration-processor - true - org.projectlombok lombok true + + org.springframework.kafka + spring-kafka + test + org.springframework.boot spring-boot-starter-test @@ -74,36 +62,4 @@ - - - - maven-failsafe-plugin - - - org.springframework.boot - spring-boot-maven-plugin - - - - org.projectlombok - lombok - - - - - - io.fabric8 - docker-maven-plugin - ${docker-maven-plugin.version} - - - - juplo/wordcount--%a:%v - - - - - - - diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java deleted file mode 100644 index e6d3b1f..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - - -@SpringBootApplication -public class CounterApplication -{ - public static void main(String[] args) - { - SpringApplication.run(CounterApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java deleted file mode 100644 index 174521f..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java +++ /dev/null @@ -1,91 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerde; - -import java.util.Properties; -import java.util.concurrent.CompletableFuture; - -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; - - -@Configuration -@EnableConfigurationProperties(CounterApplicationProperties.class) -@Slf4j -public class CounterApplicationConfiguriation -{ - @Bean - public Properties streamProcessorProperties( - CounterApplicationProperties counterProperties) - { - Properties propertyMap = serializationConfig(); - - propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId()); - - propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer()); - if (counterProperties.getCommitInterval() != null) - propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval()); - if (counterProperties.getCacheMaxBytes() != null) - propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes()); - - propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - return propertyMap; - } - - static Properties serializationConfig() - { - Properties propertyMap = new Properties(); - - propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName()); - - return propertyMap; - } - - @Bean(initMethod = "start", destroyMethod = "stop") - public CounterStreamProcessor streamProcessor( - CounterApplicationProperties applicationProperties, - Properties streamProcessorProperties, - KeyValueBytesStoreSupplier storeSupplier, - ConfigurableApplicationContext context) - { - CounterStreamProcessor streamProcessor = new CounterStreamProcessor( - applicationProperties.getInputTopic(), - applicationProperties.getOutputTopic(), - streamProcessorProperties, - storeSupplier); - - streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); - - - return streamProcessor; - } - - @Bean - public KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.persistentKeyValueStore(STORE_NAME); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java deleted file mode 100644 index c3ada17..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.counter") -@Getter -@Setter -@ToString -public class CounterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; - private String inputTopic = "words"; - private String outputTopic = "countings"; - private Integer commitInterval; - private Integer cacheMaxBytes; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java deleted file mode 100644 index 455d895..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ /dev/null @@ -1,132 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.QueryableStoreTypes; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.kafka.support.serializer.JsonSerde; -import org.springframework.kafka.support.serializer.JsonSerializer; - -import java.util.Map; -import java.util.Properties; -import java.util.stream.Collectors; - - -@Slf4j -public class CounterStreamProcessor -{ - public static final String TYPE = "COUNTER"; - public static final String STORE_NAME = "counter"; - - - public final KafkaStreams streams; - - - public CounterStreamProcessor( - String inputTopic, - String outputTopic, - Properties properties, - KeyValueBytesStoreSupplier storeSupplier) - { - Topology topology = CounterStreamProcessor.buildTopology( - inputTopic, - outputTopic, - storeSupplier); - - streams = new KafkaStreams(topology, properties); - } - - static Topology buildTopology( - String inputTopic, - String outputTopic, - KeyValueBytesStoreSupplier storeSupplier) - { - StreamsBuilder builder = new StreamsBuilder(); - - builder - .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) - .mapValues(word -> Word.of(word.getUser(), word.getWord())) - .map((key, word) -> new KeyValue<>(word, word)) - .groupByKey() - .count( - Materialized - .as(storeSupplier) - .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed! - .toStream() - .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter))) - .to(outputTopic, Produced.with(outKeySerde(), outValueSerde())); - - Topology topology = builder.build(); - log.info("\n\n{}", topology.describe()); - - return topology; - } - - ReadOnlyKeyValueStore getStore() - { - return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); - } - - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); - } - - - - public static JsonSerde inKeySerde() - { - return new JsonSerde<>(User.class); - } - - public static JsonSerde inValueSerde() - { - return new JsonSerde<>(UserWord.class); - } - - public static JsonSerde outKeySerde() - { - return serde(true); - } - - public static JsonSerde outValueSerde() - { - return serde(false); - } - - public static JsonSerde serde(boolean isKey) - { - JsonSerde serde = new JsonSerde<>(); - serde.configure( - Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()), - isKey); - return serde; - } - - private static String typeMappingsConfig() - { - return typeMappingsConfig(Word.class, WordCounter.class); - } - - public static String typeMappingsConfig(Class keyClass, Class counterClass) - { - return Map.of( - "key", keyClass, - "counter", counterClass) - .entrySet() - .stream() - .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) - .collect(Collectors.joining(",")); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java deleted file mode 100644 index e38bcba..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/User.java +++ /dev/null @@ -1,12 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class User -{ - String user; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java b/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java deleted file mode 100644 index db1ccb2..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.Data; - - -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class UserWord -{ - private String user; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java deleted file mode 100644 index a058ff8..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class Word -{ - private final String type = CounterStreamProcessor.TYPE; - private String channel; - private String key; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java deleted file mode 100644 index 211fa4c..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(access = AccessLevel.PRIVATE) -public class WordCounter -{ - String user; - String key; - long counter; - - public static WordCounter of(Word word, long counter) - { - return new WordCounter(word.getChannel(), word.getKey(), counter); - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml deleted file mode 100644 index d940f22..0000000 --- a/src/main/resources/application.yml +++ /dev/null @@ -1,7 +0,0 @@ -server: - port: 8083 -management: - endpoints: - web: - exposure: - include: "*" diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java deleted file mode 100644 index ab395fd..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ /dev/null @@ -1,165 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.support.SendResult; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; -import static org.awaitility.Awaitility.await; - - -@SpringBootTest( - properties = { - "spring.main.allow-bean-definition-overriding=true", - "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=100", - "juplo.wordcount.counter.cache-max-bytes=0", - "juplo.wordcount.counter.input-topic=" + TOPIC_IN, - "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) -@Slf4j -public class CounterApplicationIT -{ - public static final String TOPIC_IN = "in"; - public static final String TOPIC_OUT = "out"; - - @Autowired - Consumer consumer; - @Autowired - CounterStreamProcessor streamProcessor; - - - @BeforeAll - public static void testSendMessage( - @Autowired KafkaTemplate kafkaTemplate) - { - TestData - .getInputMessages() - .forEach(kv -> - { - try - { - SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); - log.info( - "Sent: {}={}, partition={}, offset={}", - result.getProducerRecord().key(), - result.getProducerRecord().value(), - result.getRecordMetadata().partition(), - result.getRecordMetadata().offset()); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - }); - } - - @DisplayName("Await the expected number of messages") - @Test - public void testAwaitExpectedNumberOfMessagesForUsers() - { - await("Expected number of messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); - } - - @DisplayName("Await the expected output messages") - @Test - void testSendMessage() - { - await("Expected messages") - .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); - } - - @DisplayName("Await the expected final output messages") - @Test - public void testAwaitExpectedLastMessagesForUsers() - { - await("Expected final output messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); - } - - @DisplayName("Await the expected state in the state-store") - @Test - public void testAwaitExpectedState() - { - await("Expected state") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); - } - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, - @Payload TestOutputWordCounter counter) - { - log.debug("Received message: {} -> {}", word, counter); - received.add(word, counter); - } - - synchronized void enforceAssertion( - java.util.function.Consumer> assertion) - { - assertion.accept(received); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - - @Bean - KeyValueBytesStoreSupplier storeSupplier() - { - return Stores.inMemoryKeyValueStore(STORE_NAME); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java deleted file mode 100644 index e80e383..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ /dev/null @@ -1,130 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.TestInputTopic; -import org.apache.kafka.streams.TestOutputTopic; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.Map; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig; -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; - - -@Slf4j -public class CounterStreamProcessorTopologyTest -{ - public static final String IN = "TEST-IN"; - public static final String OUT = "TEST-OUT"; - - - static TopologyTestDriver testDriver; - static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); - - - @BeforeAll - public static void setUpTestDriver() - { - Topology topology = CounterStreamProcessor.buildTopology( - IN, - OUT, - Stores.inMemoryKeyValueStore(STORE_NAME)); - - testDriver = new TopologyTestDriver(topology, serializationConfig()); - - TestInputTopic in = - testDriver.createInputTopic(IN, serializer(), serializer()); - TestOutputTopic out = - testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer()); - - TestData - .getInputMessages() - .forEach(kv -> in.pipeInput(kv.key, kv.value)); - - receivedMessages = new LinkedMultiValueMap<>(); - out - .readRecordsToList() - .forEach(record -> receivedMessages.add(record.key(), record.value())); - } - - - @DisplayName("Assert the expected output messages") - @Test - public void testExpectedMessages() - { - TestData.assertExpectedMessages(receivedMessages); - } - - @DisplayName("Assert the expected number of messages") - @Test - public void testExpectedNumberOfMessagesForWord() - { - TestData.assertExpectedNumberOfMessagesForWord(receivedMessages); - } - - @DisplayName("Await the expected final output messages") - @Test - public void testExpectedLastMessagesForWord() - { - TestData.assertExpectedLastMessagesForWord(receivedMessages); - } - - @DisplayName("Assert the expected state in the state-store") - @Test - public void testExpectedState() - { - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); - } - - @AfterAll - public static void tearDown() - { - testDriver.close(); - } - - - private static JsonSerializer serializer() - { - return new JsonSerializer().noTypeInfo(); - } - - private static JsonDeserializer keyDeserializer() - { - return deserializer(true); - } - - private static JsonDeserializer valueDeserializer() - { - return deserializer(false); - } - - private static JsonDeserializer deserializer(boolean isKey) - { - JsonDeserializer deserializer = new JsonDeserializer<>(); - deserializer.configure( - Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()), - isKey); - return deserializer; - } - - private static String typeMappingsConfig() - { - return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java deleted file mode 100644 index 9b38dbc..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ /dev/null @@ -1,204 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import de.juplo.kafka.wordcount.splitter.TestInputUser; -import de.juplo.kafka.wordcount.splitter.TestInputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWord; -import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.util.stream.Stream; - -import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE; -import static org.assertj.core.api.Assertions.assertThat; - - -class TestData -{ - static final String PETER = "peter"; - static final String KLAUS = "klaus"; - - static final String WORD_HALLO = "Hallo"; - static final String WORD_MÜSCH = "Müsch"; - static final String WORD_WELT = "Welt"; - static final String WORD_S = "s"; - static final String WORD_BOÄH = "Boäh"; - - static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO); - static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT); - static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH); - static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH); - static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S); - - private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] - { - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_HALLO)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_MÜSCH)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_WELT)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - KeyValue.pair( - TestInputUser.of(PETER), - TestInputWord.of(PETER, WORD_BOÄH)), - KeyValue.pair( - TestInputUser.of(KLAUS), - TestInputWord.of(KLAUS, WORD_S)), - }; - - static Stream> getInputMessages() - { - return Stream.of(TestData.INPUT_MESSAGES); - } - - static void assertExpectedMessages(MultiValueMap receivedMessages) - { - expectedMessages().forEach( - (word, counter) -> - assertThat(receivedMessages.get(word)) - .containsExactlyElementsOf(counter)); - } - - static void assertExpectedNumberOfMessagesForWord(MultiValueMap receivedMessages) - { - assertThat(countMessagesForWord(PETER_HALLO, receivedMessages)); - assertThat(countMessagesForWord(PETER_WELT, receivedMessages)); - assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages)); - assertThat(countMessagesForWord(KLAUS_S, receivedMessages)); - } - - private static int countMessagesForWord(TestOutputWord word, MultiValueMap messagesForUsers) - { - return messagesForUsers.get(word) == null - ? 0 - : messagesForUsers.get(word).size(); - } - - static void assertExpectedState(ReadOnlyKeyValueStore store) - { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO))); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT))); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH))); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S))); - } - - private static Word wordOf(TestOutputWord testOutputWord) - { - return Word.of( - testOutputWord.getChannel(), - testOutputWord.getKey()); - } - - static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) - { - assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages)); - assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages)); - } - - private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - Long counter) - { - TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( - word.getChannel(), - word.getKey(), - counter); - assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); - } - - private static void assertWordCountEqualsWordCountFromLastMessage( - TestOutputWord word, - TestOutputWordCounter counter) - { - assertThat(counter).isEqualTo(getLastMessageFor(word)); - } - - private static TestOutputWordCounter getLastMessageFor(TestOutputWord word) - { - return getLastMessageFor(word, expectedMessages()); - } - - private static TestOutputWordCounter getLastMessageFor( - TestOutputWord user, - MultiValueMap messagesForWord) - { - return messagesForWord - .get(user) - .stream() - .reduce(null, (left, right) -> right); - } - - private static final KeyValue[] EXPECTED_MESSAGES = new KeyValue[] - { - KeyValue.pair( - PETER_HALLO, - TestOutputWordCounter.of(PETER, WORD_HALLO,1)), - KeyValue.pair( - KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)), - KeyValue.pair( - PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,1)), - KeyValue.pair( - KLAUS_MÜSCH, - TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,1)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,1)), - KeyValue.pair( - PETER_WELT, - TestOutputWordCounter.of(PETER, WORD_WELT,2)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,2)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,2)), - KeyValue.pair( - PETER_BOÄH, - TestOutputWordCounter.of(PETER, WORD_BOÄH,3)), - KeyValue.pair( - KLAUS_S, - TestOutputWordCounter.of(KLAUS, WORD_S,3)), - }; - - static MultiValueMap expectedMessages() - { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); - Stream - .of(EXPECTED_MESSAGES) - .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); - return expectedMessages; - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java deleted file mode 100644 index 2255b61..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java +++ /dev/null @@ -1,14 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestInputUser -{ - String user; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java deleted file mode 100644 index 71ed1d9..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java +++ /dev/null @@ -1,15 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestInputWord -{ - String user; - String word; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java deleted file mode 100644 index 132f6ba..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestOutputWord -{ - String type; - String channel; - String key; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java deleted file mode 100644 index a5f5d43..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java +++ /dev/null @@ -1,16 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@Data -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -public class TestOutputWordCounter -{ - String user; - String key; - long counter; -} -- 2.20.1