Separated Example-Tests from `counter` examples
authorKai Moritz <kai@juplo.de>
Fri, 5 Jul 2024 20:44:23 +0000 (22:44 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 5 Jul 2024 20:44:23 +0000 (22:44 +0200)
17 files changed:
pom.xml
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/User.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/Word.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java [deleted file]
src/main/resources/application.yml [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/counter/TestData.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java [deleted file]

diff --git a/pom.xml b/pom.xml
index 013bb5c..091cc27 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -1,57 +1,45 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
        <modelVersion>4.0.0</modelVersion>
+
        <parent>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-parent</artifactId>
                <version>3.2.7</version>
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
-       <groupId>de.juplo.kafka.wordcount</groupId>
-       <artifactId>counter</artifactId>
-       <version>1.4.2</version>
-       <name>Wordcount-Counter</name>
-       <description>Word-counting stream-processor of the multi-user wordcount-example</description>
+
+       <groupId>de.juplo.kafka.streams.demos</groupId>
+       <artifactId>examples</artifactId>
+       <version>1.0.0-SNAPSHOT</version>
+
+       <name>Examples</name>
+       <description>Examples for Kafka Streams</description>
+
        <properties>
                <java.version>21</java.version>
                <docker-maven-plugin.version>0.44.0</docker-maven-plugin.version>
        </properties>
+
        <dependencies>
-               <dependency>
-                       <groupId>org.springframework.boot</groupId>
-                       <artifactId>spring-boot-starter-actuator</artifactId>
-               </dependency>
-               <dependency>
-                       <groupId>org.springframework.boot</groupId>
-                       <artifactId>spring-boot-starter-web</artifactId>
-               </dependency>
+
                <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka-streams</artifactId>
                </dependency>
-               <dependency>
-                       <groupId>org.springframework.kafka</groupId>
-                       <artifactId>spring-kafka</artifactId>
-               </dependency>
-
-               <dependency>
-                       <groupId>org.springframework.boot</groupId>
-                       <artifactId>spring-boot-devtools</artifactId>
-                       <scope>runtime</scope>
-                       <optional>true</optional>
-               </dependency>
-               <dependency>
-                       <groupId>org.springframework.boot</groupId>
-                       <artifactId>spring-boot-configuration-processor</artifactId>
-                       <optional>true</optional>
-               </dependency>
                <dependency>
                        <groupId>org.projectlombok</groupId>
                        <artifactId>lombok</artifactId>
                        <optional>true</optional>
                </dependency>
 
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka</artifactId>
+                       <scope>test</scope>
+               </dependency>
                <dependency>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-test</artifactId>
                </dependency>
        </dependencies>
 
-       <build>
-               <plugins>
-                       <plugin>
-                               <artifactId>maven-failsafe-plugin</artifactId>
-                       </plugin>
-                       <plugin>
-                               <groupId>org.springframework.boot</groupId>
-                               <artifactId>spring-boot-maven-plugin</artifactId>
-                               <configuration>
-                                       <excludes>
-                                               <exclude>
-                                                       <groupId>org.projectlombok</groupId>
-                                                       <artifactId>lombok</artifactId>
-                                               </exclude>
-                                       </excludes>
-                               </configuration>
-                       </plugin>
-                       <plugin>
-                               <groupId>io.fabric8</groupId>
-                               <artifactId>docker-maven-plugin</artifactId>
-                               <version>${docker-maven-plugin.version}</version>
-                               <configuration>
-                                       <images>
-                                               <image>
-                                                       <name>juplo/wordcount--%a:%v</name>
-                                               </image>
-                                       </images>
-                               </configuration>
-                       </plugin>
-               </plugins>
-       </build>
-
 </project>
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java
deleted file mode 100644 (file)
index e6d3b1f..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class CounterApplication
-{
-       public static void main(String[] args)
-       {
-               SpringApplication.run(CounterApplication.class, args);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationConfiguriation.java
deleted file mode 100644 (file)
index 174521f..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
-@Slf4j
-public class CounterApplicationConfiguriation
-{
-       @Bean
-       public Properties streamProcessorProperties(
-                       CounterApplicationProperties counterProperties)
-       {
-               Properties propertyMap = serializationConfig();
-
-               propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
-
-               propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
-               if (counterProperties.getCommitInterval() != null)
-                       propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
-               if (counterProperties.getCacheMaxBytes() != null)
-                       propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
-
-               propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-               return propertyMap;
-       }
-
-       static Properties serializationConfig()
-       {
-               Properties propertyMap = new Properties();
-
-               propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
-               propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
-
-               return propertyMap;
-       }
-
-       @Bean(initMethod = "start", destroyMethod = "stop")
-       public CounterStreamProcessor streamProcessor(
-                       CounterApplicationProperties applicationProperties,
-                       Properties streamProcessorProperties,
-                       KeyValueBytesStoreSupplier storeSupplier,
-                       ConfigurableApplicationContext context)
-       {
-               CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
-                               applicationProperties.getInputTopic(),
-                               applicationProperties.getOutputTopic(),
-                               streamProcessorProperties,
-                               storeSupplier);
-
-               streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
-               {
-                       log.error("Unexpected error!", e);
-                       CompletableFuture.runAsync(() ->
-                       {
-                               log.info("Stopping application...");
-                               SpringApplication.exit(context, () -> 1);
-                       });
-                       return SHUTDOWN_CLIENT;
-               });
-
-
-               return streamProcessor;
-       }
-
-       @Bean
-       public KeyValueBytesStoreSupplier storeSupplier()
-       {
-               return Stores.persistentKeyValueStore(STORE_NAME);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java
deleted file mode 100644 (file)
index c3ada17..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.counter")
-@Getter
-@Setter
-@ToString
-public class CounterApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String applicationId = "counter";
-  private String inputTopic = "words";
-  private String outputTopic = "countings";
-  private Integer commitInterval;
-  private Integer cacheMaxBytes;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
deleted file mode 100644 (file)
index 455d895..0000000
+++ /dev/null
@@ -1,132 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class CounterStreamProcessor
-{
-       public static final String TYPE = "COUNTER";
-       public static final String STORE_NAME = "counter";
-
-
-       public final KafkaStreams streams;
-
-
-       public CounterStreamProcessor(
-                       String inputTopic,
-                       String outputTopic,
-                       Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier)
-       {
-               Topology topology = CounterStreamProcessor.buildTopology(
-                               inputTopic,
-                               outputTopic,
-                               storeSupplier);
-
-               streams = new KafkaStreams(topology, properties);
-       }
-
-       static Topology buildTopology(
-                       String inputTopic,
-                       String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
-       {
-               StreamsBuilder builder = new StreamsBuilder();
-
-               builder
-                               .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
-                               .mapValues(word -> Word.of(word.getUser(), word.getWord()))
-                               .map((key, word) -> new KeyValue<>(word, word))
-                               .groupByKey()
-                               .count(
-                                               Materialized
-                                                               .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
-                               .toStream()
-                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
-                               .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
-
-               Topology topology = builder.build();
-               log.info("\n\n{}", topology.describe());
-
-               return topology;
-       }
-
-       ReadOnlyKeyValueStore<Word, Long> getStore()
-       {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
-       }
-
-       public void start()
-       {
-               log.info("Starting Stream-Processor");
-               streams.start();
-       }
-
-       public void stop()
-       {
-               log.info("Stopping Stream-Processor");
-               streams.close();
-       }
-
-
-
-       public static JsonSerde<User> inKeySerde()
-       {
-               return new JsonSerde<>(User.class);
-       }
-
-       public static JsonSerde<UserWord> inValueSerde()
-       {
-               return new JsonSerde<>(UserWord.class);
-       }
-
-       public static JsonSerde<Word> outKeySerde()
-       {
-               return serde(true);
-       }
-
-       public static JsonSerde<WordCounter> outValueSerde()
-       {
-               return serde(false);
-       }
-
-       public static <T> JsonSerde<T> serde(boolean isKey)
-       {
-               JsonSerde<T> serde = new JsonSerde<>();
-               serde.configure(
-                               Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
-                               isKey);
-               return serde;
-       }
-
-       private static String typeMappingsConfig()
-       {
-               return typeMappingsConfig(Word.class, WordCounter.class);
-       }
-
-       public static String typeMappingsConfig(Class keyClass, Class counterClass)
-       {
-               return Map.of(
-                                               "key", keyClass,
-                                               "counter", counterClass)
-                               .entrySet()
-                               .stream()
-                               .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
-                               .collect(Collectors.joining(","));
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/User.java b/src/main/java/de/juplo/kafka/wordcount/counter/User.java
deleted file mode 100644 (file)
index e38bcba..0000000
+++ /dev/null
@@ -1,12 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class User
-{
-  String user;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java b/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java
deleted file mode 100644 (file)
index db1ccb2..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class UserWord
-{
-  private String user;
-  private String word;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java
deleted file mode 100644 (file)
index a058ff8..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class Word
-{
-  private final String type = CounterStreamProcessor.TYPE;
-  private String channel;
-  private String key;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java
deleted file mode 100644 (file)
index 211fa4c..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class WordCounter
-{
-  String user;
-  String key;
-  long counter;
-
-  public static WordCounter of(Word word, long counter)
-  {
-    return new WordCounter(word.getChannel(), word.getKey(), counter);
-  }
-}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
deleted file mode 100644 (file)
index d940f22..0000000
+++ /dev/null
@@ -1,7 +0,0 @@
-server:
-  port: 8083
-management:
-  endpoints:
-    web:
-      exposure:
-        include: "*"
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java
deleted file mode 100644 (file)
index ab395fd..0000000
+++ /dev/null
@@ -1,165 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
-               properties = {
-                               "spring.main.allow-bean-definition-overriding=true",
-                               "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-                               "spring.kafka.producer.properties.spring.json.add.type.headers=false",
-                               "spring.kafka.consumer.auto-offset-reset=earliest",
-                               "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
-                               "logging.level.root=WARN",
-                               "logging.level.de.juplo=DEBUG",
-                               "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "juplo.wordcount.counter.commit-interval=100",
-                               "juplo.wordcount.counter.cache-max-bytes=0",
-                               "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
-                               "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
-@Slf4j
-public class CounterApplicationIT
-{
-       public static final String TOPIC_IN = "in";
-       public static final String TOPIC_OUT = "out";
-
-       @Autowired
-       Consumer consumer;
-       @Autowired
-       CounterStreamProcessor streamProcessor;
-
-
-       @BeforeAll
-       public static void testSendMessage(
-                       @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
-       {
-               TestData
-                               .getInputMessages()
-                               .forEach(kv ->
-                               {
-                                       try
-                                       {
-                                               SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
-                                               log.info(
-                                                               "Sent: {}={}, partition={}, offset={}",
-                                                               result.getProducerRecord().key(),
-                                                               result.getProducerRecord().value(),
-                                                               result.getRecordMetadata().partition(),
-                                                               result.getRecordMetadata().offset());
-                                       }
-                                       catch (Exception e)
-                                       {
-                                               throw new RuntimeException(e);
-                                       }
-                               });
-       }
-
-       @DisplayName("Await the expected number of messages")
-       @Test
-       public void testAwaitExpectedNumberOfMessagesForUsers()
-       {
-               await("Expected number of messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected output messages")
-       @Test
-       void testSendMessage()
-       {
-               await("Expected messages")
-                               .atMost(Duration.ofSeconds(10))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected final output messages")
-       @Test
-       public void testAwaitExpectedLastMessagesForUsers()
-       {
-               await("Expected final output messages")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> consumer.enforceAssertion(
-                                               receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
-       }
-
-       @DisplayName("Await the expected state in the state-store")
-       @Test
-       public void testAwaitExpectedState()
-       {
-               await("Expected state")
-                               .atMost(Duration.ofSeconds(5))
-                               .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
-       }
-
-
-       static class Consumer
-       {
-               private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
-
-               @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
-               public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
-                               @Payload TestOutputWordCounter counter)
-               {
-                       log.debug("Received message: {} -> {}", word, counter);
-                       received.add(word, counter);
-               }
-
-               synchronized void enforceAssertion(
-                               java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
-               {
-                       assertion.accept(received);
-               }
-       }
-
-       @TestConfiguration
-       static class Configuration
-       {
-               @Bean
-               Consumer consumer()
-               {
-                       return new Consumer();
-               }
-
-               @Bean
-               KeyValueBytesStoreSupplier storeSupplier()
-               {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java
deleted file mode 100644 (file)
index e80e383..0000000
+++ /dev/null
@@ -1,130 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-
-
-@Slf4j
-public class CounterStreamProcessorTopologyTest
-{
-  public static final String IN = "TEST-IN";
-  public static final String OUT = "TEST-OUT";
-
-
-  static TopologyTestDriver testDriver;
-  static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
-
-
-  @BeforeAll
-  public static void setUpTestDriver()
-  {
-    Topology topology = CounterStreamProcessor.buildTopology(
-        IN,
-        OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
-
-    testDriver = new TopologyTestDriver(topology, serializationConfig());
-
-    TestInputTopic<TestInputUser, TestInputWord> in =
-        testDriver.createInputTopic(IN, serializer(), serializer());
-    TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
-        testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
-
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
-    receivedMessages = new LinkedMultiValueMap<>();
-    out
-        .readRecordsToList()
-        .forEach(record -> receivedMessages.add(record.key(), record.value()));
-  }
-
-
-  @DisplayName("Assert the expected output messages")
-  @Test
-  public void testExpectedMessages()
-  {
-    TestData.assertExpectedMessages(receivedMessages);
-  }
-
-  @DisplayName("Assert the expected number of messages")
-  @Test
-  public void testExpectedNumberOfMessagesForWord()
-  {
-    TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
-  }
-
-  @DisplayName("Await the expected final output messages")
-  @Test
-  public void testExpectedLastMessagesForWord()
-  {
-    TestData.assertExpectedLastMessagesForWord(receivedMessages);
-  }
-
-  @DisplayName("Assert the expected state in the state-store")
-  @Test
-  public void testExpectedState()
-  {
-    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
-    TestData.assertExpectedState(store);
-  }
-
-  @AfterAll
-  public static void tearDown()
-  {
-    testDriver.close();
-  }
-
-
-  private static JsonSerializer serializer()
-  {
-    return new JsonSerializer().noTypeInfo();
-  }
-
-  private static JsonDeserializer<TestOutputWord> keyDeserializer()
-  {
-    return deserializer(true);
-  }
-
-  private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
-  {
-    return deserializer(false);
-  }
-
-  private static <T> JsonDeserializer<T> deserializer(boolean isKey)
-  {
-    JsonDeserializer<T> deserializer = new JsonDeserializer<>();
-    deserializer.configure(
-        Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
-        isKey);
-    return deserializer;
-  }
-
-  private static String typeMappingsConfig()
-  {
-    return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java
deleted file mode 100644 (file)
index 9b38dbc..0000000
+++ /dev/null
@@ -1,204 +0,0 @@
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.stream.Stream;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
-       static final String PETER = "peter";
-       static final String KLAUS = "klaus";
-
-       static final String WORD_HALLO = "Hallo";
-       static final String WORD_MÜSCH = "Müsch";
-       static final String WORD_WELT = "Welt";
-       static final String WORD_S = "s";
-       static final String WORD_BOÄH = "Boäh";
-
-       static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO);
-       static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT);
-       static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH);
-       static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH);
-       static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S);
-
-       private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
-       {
-                       KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_HALLO)),
-                       KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
-                                       TestInputUser.of(PETER),
-                                       TestInputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
-                                       TestInputUser.of(KLAUS),
-                                       TestInputWord.of(KLAUS, WORD_S)),
-       };
-
-       static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
-       {
-               return Stream.of(TestData.INPUT_MESSAGES);
-       }
-
-       static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-       {
-               expectedMessages().forEach(
-                               (word, counter) ->
-                                               assertThat(receivedMessages.get(word))
-                                                               .containsExactlyElementsOf(counter));
-       }
-
-       static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-       {
-               assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
-               assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
-               assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
-       }
-
-       private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
-       {
-               return messagesForUsers.get(word) == null
-                               ? 0
-                               : messagesForUsers.get(word).size();
-       }
-
-       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
-       {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
-       }
-
-       private static Word wordOf(TestOutputWord testOutputWord)
-       {
-               return Word.of(
-                               testOutputWord.getChannel(),
-                               testOutputWord.getKey());
-       }
-
-       static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
-       {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
-       }
-
-       private static void assertWordCountEqualsWordCountFromLastMessage(
-                       TestOutputWord word,
-                       Long counter)
-       {
-               TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
-                               word.getChannel(),
-                               word.getKey(),
-                               counter);
-               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
-       }
-
-       private static void assertWordCountEqualsWordCountFromLastMessage(
-                       TestOutputWord word,
-                       TestOutputWordCounter counter)
-       {
-               assertThat(counter).isEqualTo(getLastMessageFor(word));
-       }
-
-       private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
-       {
-               return getLastMessageFor(word, expectedMessages());
-       }
-
-       private static TestOutputWordCounter getLastMessageFor(
-                       TestOutputWord user,
-                       MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
-       {
-               return messagesForWord
-                               .get(user)
-                               .stream()
-                               .reduce(null, (left, right) -> right);
-       }
-
-       private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
-       {
-                       KeyValue.pair(
-                                       PETER_HALLO,
-                                       TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
-                       KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
-                       KeyValue.pair(
-                                       PETER_WELT,
-                                       TestOutputWordCounter.of(PETER, WORD_WELT,1)),
-                       KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
-                       KeyValue.pair(
-                                       KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,1)),
-                       KeyValue.pair(
-                                       PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
-                       KeyValue.pair(
-                                       PETER_WELT,
-                                       TestOutputWordCounter.of(PETER, WORD_WELT,2)),
-                       KeyValue.pair(
-                                       PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
-                       KeyValue.pair(
-                                       KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,2)),
-                       KeyValue.pair(
-                                       PETER_BOÄH,
-                                       TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
-                       KeyValue.pair(
-                                       KLAUS_S,
-                                       TestOutputWordCounter.of(KLAUS, WORD_S,3)),
-       };
-
-       static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
-       {
-               MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
-               Stream
-                               .of(EXPECTED_MESSAGES)
-                               .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
-               return expectedMessages;
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputUser.java
deleted file mode 100644 (file)
index 2255b61..0000000
+++ /dev/null
@@ -1,14 +0,0 @@
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestInputUser
-{
-  String user;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestInputWord.java
deleted file mode 100644 (file)
index 71ed1d9..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestInputWord
-{
-  String user;
-  String word;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java
deleted file mode 100644 (file)
index 132f6ba..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWord
-{
-  String type;
-  String channel;
-  String key;
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java
deleted file mode 100644 (file)
index a5f5d43..0000000
+++ /dev/null
@@ -1,16 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
-{
-  String user;
-  String key;
-  long counter;
-}