<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+
<modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.7</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
- <groupId>de.juplo.kafka.wordcount</groupId>
- <artifactId>counter</artifactId>
- <version>1.4.2</version>
- <name>Wordcount-Counter</name>
- <description>Word-counting stream-processor of the multi-user wordcount-example</description>
+
+ <groupId>de.juplo.kafka.streams.demos</groupId>
+ <artifactId>examples</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <name>Examples</name>
+ <description>Examples for Kafka Streams</description>
+
<properties>
<java.version>21</java.version>
<docker-maven-plugin.version>0.44.0</docker-maven-plugin.version>
</properties>
+
<dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-actuator</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
+
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-devtools</artifactId>
- <scope>runtime</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
- </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </exclude>
- </excludes>
- </configuration>
- </plugin>
- <plugin>
- <groupId>io.fabric8</groupId>
- <artifactId>docker-maven-plugin</artifactId>
- <version>${docker-maven-plugin.version}</version>
- <configuration>
- <images>
- <image>
- <name>juplo/wordcount--%a:%v</name>
- </image>
- </images>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
</project>
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class CounterApplication
-{
- public static void main(String[] args)
- {
- SpringApplication.run(CounterApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
-@Slf4j
-public class CounterApplicationConfiguriation
-{
- @Bean
- public Properties streamProcessorProperties(
- CounterApplicationProperties counterProperties)
- {
- Properties propertyMap = serializationConfig();
-
- propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
-
- propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
- if (counterProperties.getCommitInterval() != null)
- propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
- if (counterProperties.getCacheMaxBytes() != null)
- propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
-
- propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- return propertyMap;
- }
-
- static Properties serializationConfig()
- {
- Properties propertyMap = new Properties();
-
- propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
-
- return propertyMap;
- }
-
- @Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties applicationProperties,
- Properties streamProcessorProperties,
- KeyValueBytesStoreSupplier storeSupplier,
- ConfigurableApplicationContext context)
- {
- CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
- applicationProperties.getInputTopic(),
- applicationProperties.getOutputTopic(),
- streamProcessorProperties,
- storeSupplier);
-
- streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
-
-
- return streamProcessor;
- }
-
- @Bean
- public KeyValueBytesStoreSupplier storeSupplier()
- {
- return Stores.persistentKeyValueStore(STORE_NAME);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.counter")
-@Getter
-@Setter
-@ToString
-public class CounterApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String applicationId = "counter";
- private String inputTopic = "words";
- private String outputTopic = "countings";
- private Integer commitInterval;
- private Integer cacheMaxBytes;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class CounterStreamProcessor
-{
- public static final String TYPE = "COUNTER";
- public static final String STORE_NAME = "counter";
-
-
- public final KafkaStreams streams;
-
-
- public CounterStreamProcessor(
- String inputTopic,
- String outputTopic,
- Properties properties,
- KeyValueBytesStoreSupplier storeSupplier)
- {
- Topology topology = CounterStreamProcessor.buildTopology(
- inputTopic,
- outputTopic,
- storeSupplier);
-
- streams = new KafkaStreams(topology, properties);
- }
-
- static Topology buildTopology(
- String inputTopic,
- String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
- {
- StreamsBuilder builder = new StreamsBuilder();
-
- builder
- .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
- .mapValues(word -> Word.of(word.getUser(), word.getWord()))
- .map((key, word) -> new KeyValue<>(word, word))
- .groupByKey()
- .count(
- Materialized
- .<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
- .toStream()
- .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
- .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
-
- Topology topology = builder.build();
- log.info("\n\n{}", topology.describe());
-
- return topology;
- }
-
- ReadOnlyKeyValueStore<Word, Long> getStore()
- {
- return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
- }
-
- public void start()
- {
- log.info("Starting Stream-Processor");
- streams.start();
- }
-
- public void stop()
- {
- log.info("Stopping Stream-Processor");
- streams.close();
- }
-
-
-
- public static JsonSerde<User> inKeySerde()
- {
- return new JsonSerde<>(User.class);
- }
-
- public static JsonSerde<UserWord> inValueSerde()
- {
- return new JsonSerde<>(UserWord.class);
- }
-
- public static JsonSerde<Word> outKeySerde()
- {
- return serde(true);
- }
-
- public static JsonSerde<WordCounter> outValueSerde()
- {
- return serde(false);
- }
-
- public static <T> JsonSerde<T> serde(boolean isKey)
- {
- JsonSerde<T> serde = new JsonSerde<>();
- serde.configure(
- Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
- isKey);
- return serde;
- }
-
- private static String typeMappingsConfig()
- {
- return typeMappingsConfig(Word.class, WordCounter.class);
- }
-
- public static String typeMappingsConfig(Class keyClass, Class counterClass)
- {
- return Map.of(
- "key", keyClass,
- "counter", counterClass)
- .entrySet()
- .stream()
- .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
- .collect(Collectors.joining(","));
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class User
-{
- String user;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class UserWord
-{
- private String user;
- private String word;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class Word
-{
- private final String type = CounterStreamProcessor.TYPE;
- private String channel;
- private String key;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class WordCounter
-{
- String user;
- String key;
- long counter;
-
- public static WordCounter of(Word word, long counter)
- {
- return new WordCounter(word.getChannel(), word.getKey(), counter);
- }
-}
+++ /dev/null
-server:
- port: 8083
-management:
- endpoints:
- web:
- exposure:
- include: "*"
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
- properties = {
- "spring.main.allow-bean-definition-overriding=true",
- "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.add.type.headers=false",
- "spring.kafka.consumer.auto-offset-reset=earliest",
- "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
- "logging.level.root=WARN",
- "logging.level.de.juplo=DEBUG",
- "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=100",
- "juplo.wordcount.counter.cache-max-bytes=0",
- "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
- "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
-@Slf4j
-public class CounterApplicationIT
-{
- public static final String TOPIC_IN = "in";
- public static final String TOPIC_OUT = "out";
-
- @Autowired
- Consumer consumer;
- @Autowired
- CounterStreamProcessor streamProcessor;
-
-
- @BeforeAll
- public static void testSendMessage(
- @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
- {
- TestData
- .getInputMessages()
- .forEach(kv ->
- {
- try
- {
- SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
- log.info(
- "Sent: {}={}, partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- });
- }
-
- @DisplayName("Await the expected number of messages")
- @Test
- public void testAwaitExpectedNumberOfMessagesForUsers()
- {
- await("Expected number of messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
- }
-
- @DisplayName("Await the expected output messages")
- @Test
- void testSendMessage()
- {
- await("Expected messages")
- .atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
- }
-
- @DisplayName("Await the expected final output messages")
- @Test
- public void testAwaitExpectedLastMessagesForUsers()
- {
- await("Expected final output messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
- }
-
- @DisplayName("Await the expected state in the state-store")
- @Test
- public void testAwaitExpectedState()
- {
- await("Expected state")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
- }
-
-
- static class Consumer
- {
- private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
-
- @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
- public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
- @Payload TestOutputWordCounter counter)
- {
- log.debug("Received message: {} -> {}", word, counter);
- received.add(word, counter);
- }
-
- synchronized void enforceAssertion(
- java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
- {
- assertion.accept(received);
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- Consumer consumer()
- {
- return new Consumer();
- }
-
- @Bean
- KeyValueBytesStoreSupplier storeSupplier()
- {
- return Stores.inMemoryKeyValueStore(STORE_NAME);
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-
-
-@Slf4j
-public class CounterStreamProcessorTopologyTest
-{
- public static final String IN = "TEST-IN";
- public static final String OUT = "TEST-OUT";
-
-
- static TopologyTestDriver testDriver;
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
-
-
- @BeforeAll
- public static void setUpTestDriver()
- {
- Topology topology = CounterStreamProcessor.buildTopology(
- IN,
- OUT,
- Stores.inMemoryKeyValueStore(STORE_NAME));
-
- testDriver = new TopologyTestDriver(topology, serializationConfig());
-
- TestInputTopic<TestInputUser, TestInputWord> in =
- testDriver.createInputTopic(IN, serializer(), serializer());
- TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
- testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
-
- TestData
- .getInputMessages()
- .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
- receivedMessages = new LinkedMultiValueMap<>();
- out
- .readRecordsToList()
- .forEach(record -> receivedMessages.add(record.key(), record.value()));
- }
-
-
- @DisplayName("Assert the expected output messages")
- @Test
- public void testExpectedMessages()
- {
- TestData.assertExpectedMessages(receivedMessages);
- }
-
- @DisplayName("Assert the expected number of messages")
- @Test
- public void testExpectedNumberOfMessagesForWord()
- {
- TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
- }
-
- @DisplayName("Await the expected final output messages")
- @Test
- public void testExpectedLastMessagesForWord()
- {
- TestData.assertExpectedLastMessagesForWord(receivedMessages);
- }
-
- @DisplayName("Assert the expected state in the state-store")
- @Test
- public void testExpectedState()
- {
- KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
- TestData.assertExpectedState(store);
- }
-
- @AfterAll
- public static void tearDown()
- {
- testDriver.close();
- }
-
-
- private static JsonSerializer serializer()
- {
- return new JsonSerializer().noTypeInfo();
- }
-
- private static JsonDeserializer<TestOutputWord> keyDeserializer()
- {
- return deserializer(true);
- }
-
- private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
- {
- return deserializer(false);
- }
-
- private static <T> JsonDeserializer<T> deserializer(boolean isKey)
- {
- JsonDeserializer<T> deserializer = new JsonDeserializer<>();
- deserializer.configure(
- Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
- isKey);
- return deserializer;
- }
-
- private static String typeMappingsConfig()
- {
- return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.stream.Stream;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE;
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
- static final String PETER = "peter";
- static final String KLAUS = "klaus";
-
- static final String WORD_HALLO = "Hallo";
- static final String WORD_MÜSCH = "Müsch";
- static final String WORD_WELT = "Welt";
- static final String WORD_S = "s";
- static final String WORD_BOÄH = "Boäh";
-
- static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO);
- static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT);
- static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH);
- static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH);
- static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S);
-
- private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
- {
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_HALLO)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
- };
-
- static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
- {
- return Stream.of(TestData.INPUT_MESSAGES);
- }
-
- static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
- {
- expectedMessages().forEach(
- (word, counter) ->
- assertThat(receivedMessages.get(word))
- .containsExactlyElementsOf(counter));
- }
-
- static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
- {
- assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
- assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
- assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
- assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
- assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
- }
-
- private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
- {
- return messagesForUsers.get(word) == null
- ? 0
- : messagesForUsers.get(word).size();
- }
-
- static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
- {
- assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
- assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
- assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
- }
-
- private static Word wordOf(TestOutputWord testOutputWord)
- {
- return Word.of(
- testOutputWord.getChannel(),
- testOutputWord.getKey());
- }
-
- static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
- {
- assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
- }
-
- private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
- Long counter)
- {
- TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
- word.getChannel(),
- word.getKey(),
- counter);
- assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
- }
-
- private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
- TestOutputWordCounter counter)
- {
- assertThat(counter).isEqualTo(getLastMessageFor(word));
- }
-
- private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
- {
- return getLastMessageFor(word, expectedMessages());
- }
-
- private static TestOutputWordCounter getLastMessageFor(
- TestOutputWord user,
- MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
- {
- return messagesForWord
- .get(user)
- .stream()
- .reduce(null, (left, right) -> right);
- }
-
- private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
- {
- KeyValue.pair(
- PETER_HALLO,
- TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
- KeyValue.pair(
- KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
- KeyValue.pair(
- PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,1)),
- KeyValue.pair(
- KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
- KeyValue.pair(
- KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,1)),
- KeyValue.pair(
- PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
- KeyValue.pair(
- PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,2)),
- KeyValue.pair(
- PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
- KeyValue.pair(
- KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,2)),
- KeyValue.pair(
- PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
- KeyValue.pair(
- KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,3)),
- };
-
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
- {
- MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
- Stream
- .of(EXPECTED_MESSAGES)
- .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
- return expectedMessages;
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestInputUser
-{
- String user;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestInputWord
-{
- String user;
- String word;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWord
-{
- String type;
- String channel;
- String key;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
-{
- String user;
- String key;
- long counter;
-}