+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class CounterApplication
-{
- public static void main(String[] args)
- {
- SpringApplication.run(CounterApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(CounterApplicationProperties.class)
-@Slf4j
-public class CounterApplicationConfiguriation
-{
- @Bean
- public Properties streamProcessorProperties(
- CounterApplicationProperties counterProperties)
- {
- Properties propertyMap = serializationConfig();
-
- propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
-
- propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
- if (counterProperties.getCommitInterval() != null)
- propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
- if (counterProperties.getCacheMaxBytes() != null)
- propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
-
- propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- return propertyMap;
- }
-
- static Properties serializationConfig()
- {
- Properties propertyMap = new Properties();
-
- propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
-
- return propertyMap;
- }
-
- @Bean(initMethod = "start", destroyMethod = "stop")
- public CounterStreamProcessor streamProcessor(
- CounterApplicationProperties applicationProperties,
- Properties streamProcessorProperties,
- KeyValueBytesStoreSupplier storeSupplier,
- ConfigurableApplicationContext context)
- {
- CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
- applicationProperties.getInputTopic(),
- applicationProperties.getOutputTopic(),
- streamProcessorProperties,
- storeSupplier);
-
- streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
-
-
- return streamProcessor;
- }
-
- @Bean
- public KeyValueBytesStoreSupplier storeSupplier()
- {
- return Stores.persistentKeyValueStore(STORE_NAME);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.counter")
-@Getter
-@Setter
-@ToString
-public class CounterApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String applicationId = "counter";
- private String inputTopic = "words";
- private String outputTopic = "countings";
- private Integer commitInterval;
- private Integer cacheMaxBytes;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.kafka.support.serializer.JsonSerde;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.stream.Collectors;
-
-
-@Slf4j
-public class CounterStreamProcessor
-{
- public static final String STORE_NAME = "counter";
-
-
- public final KafkaStreams streams;
-
-
- public CounterStreamProcessor(
- String inputTopic,
- String outputTopic,
- Properties properties,
- KeyValueBytesStoreSupplier storeSupplier)
- {
- Topology topology = CounterStreamProcessor.buildTopology(
- inputTopic,
- outputTopic,
- storeSupplier);
-
- streams = new KafkaStreams(topology, properties);
- }
-
- static Topology buildTopology(
- String inputTopic,
- String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
- {
- StreamsBuilder builder = new StreamsBuilder();
-
- builder
- .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
- .map((key, word) -> new KeyValue<>(word, word))
- .groupByKey()
- .count(
- Materialized
- .<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
- .toStream()
- .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
- .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
-
- Topology topology = builder.build();
- log.info("\n\n{}", topology.describe());
-
- return topology;
- }
-
- ReadOnlyKeyValueStore<Word, Long> getStore()
- {
- return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
- }
-
- public void start()
- {
- log.info("Starting Stream-Processor");
- streams.start();
- }
-
- public void stop()
- {
- log.info("Stopping Stream-Processor");
- streams.close();
- }
-
-
-
- public static JsonSerde<User> inKeySerde()
- {
- return new JsonSerde<>(User.class);
- }
-
- public static JsonSerde<Word> inValueSerde()
- {
- return new JsonSerde<>(Word.class);
- }
-
- public static JsonSerde<Word> outKeySerde()
- {
- return serde(true);
- }
-
- public static JsonSerde<WordCounter> outValueSerde()
- {
- return serde(false);
- }
-
- public static <T> JsonSerde<T> serde(boolean isKey)
- {
- JsonSerde<T> serde = new JsonSerde<>();
- serde.configure(
- Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
- isKey);
- return serde;
- }
-
- private static String typeMappingsConfig()
- {
- return typeMappingsConfig(Word.class, WordCounter.class);
- }
-
- public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
- {
- return Map.of(
- "word", wordClass,
- "counter", wordCounterClass)
- .entrySet()
- .stream()
- .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
- .collect(Collectors.joining(","));
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class User
-{
- String user;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.Data;
-
-
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Word
-{
- private String user;
- private String word;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-public class WordCounter
-{
- String user;
- String word;
- long counter;
-
- public static WordCounter of(Word word, long counter)
- {
- return new WordCounter(word.getUser(), word.getWord(), counter);
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class CounterApplication
+{
+ public static void main(String[] args)
+ {
+ SpringApplication.run(CounterApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(CounterApplicationProperties.class)
+@Slf4j
+public class CounterApplicationConfiguriation
+{
+ @Bean
+ public Properties streamProcessorProperties(
+ CounterApplicationProperties counterProperties)
+ {
+ Properties propertyMap = serializationConfig();
+
+ propertyMap.put(StreamsConfig.APPLICATION_ID_CONFIG, counterProperties.getApplicationId());
+
+ propertyMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, counterProperties.getBootstrapServer());
+ if (counterProperties.getCommitInterval() != null)
+ propertyMap.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, counterProperties.getCommitInterval());
+ if (counterProperties.getCacheMaxBytes() != null)
+ propertyMap.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, counterProperties.getCacheMaxBytes());
+
+ propertyMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return propertyMap;
+ }
+
+ static Properties serializationConfig()
+ {
+ Properties propertyMap = new Properties();
+
+ propertyMap.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ propertyMap.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ propertyMap.put(JsonDeserializer.TRUSTED_PACKAGES, CounterApplication.class.getPackageName());
+
+ return propertyMap;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public CounterStreamProcessor streamProcessor(
+ CounterApplicationProperties applicationProperties,
+ Properties streamProcessorProperties,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ConfigurableApplicationContext context)
+ {
+ CounterStreamProcessor streamProcessor = new CounterStreamProcessor(
+ applicationProperties.getInputTopic(),
+ applicationProperties.getOutputTopic(),
+ streamProcessorProperties,
+ storeSupplier);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+
+
+ return streamProcessor;
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.persistentKeyValueStore(STORE_NAME);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.counter")
+@Getter
+@Setter
+@ToString
+public class CounterApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String applicationId = "counter";
+ private String inputTopic = "words";
+ private String outputTopic = "countings";
+ private Integer commitInterval;
+ private Integer cacheMaxBytes;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+public class CounterStreamProcessor
+{
+ public static final String STORE_NAME = "counter";
+
+
+ public final KafkaStreams streams;
+
+
+ public CounterStreamProcessor(
+ String inputTopic,
+ String outputTopic,
+ Properties properties,
+ KeyValueBytesStoreSupplier storeSupplier)
+ {
+ Topology topology = CounterStreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic,
+ storeSupplier);
+
+ streams = new KafkaStreams(topology, properties);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier)
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ builder
+ .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
+ .map((key, word) -> new KeyValue<>(word, word))
+ .groupByKey()
+ .count(
+ Materialized
+ .<Word, Long>as(storeSupplier)
+ .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+ .toStream()
+ .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
+ .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
+ }
+
+ ReadOnlyKeyValueStore<Word, Long> getStore()
+ {
+ return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+ }
+
+ public void start()
+ {
+ log.info("Starting Stream-Processor");
+ streams.start();
+ }
+
+ public void stop()
+ {
+ log.info("Stopping Stream-Processor");
+ streams.close();
+ }
+
+
+
+ public static JsonSerde<User> inKeySerde()
+ {
+ return new JsonSerde<>(User.class);
+ }
+
+ public static JsonSerde<Word> inValueSerde()
+ {
+ return new JsonSerde<>(Word.class);
+ }
+
+ public static JsonSerde<Word> outKeySerde()
+ {
+ return serde(true);
+ }
+
+ public static JsonSerde<WordCounter> outValueSerde()
+ {
+ return serde(false);
+ }
+
+ public static <T> JsonSerde<T> serde(boolean isKey)
+ {
+ JsonSerde<T> serde = new JsonSerde<>();
+ serde.configure(
+ Map.of(JsonSerializer.TYPE_MAPPINGS, typeMappingsConfig()),
+ isKey);
+ return serde;
+ }
+
+ private static String typeMappingsConfig()
+ {
+ return typeMappingsConfig(Word.class, WordCounter.class);
+ }
+
+ public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
+ {
+ return Map.of(
+ "word", wordClass,
+ "counter", wordCounterClass)
+ .entrySet()
+ .stream()
+ .map(entry -> entry.getKey() + ":" + entry.getValue().getName())
+ .collect(Collectors.joining(","));
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class User
+{
+ String user;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Word
+{
+ private String user;
+ private String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+public class WordCounter
+{
+ String user;
+ String word;
+ long counter;
+
+ public static WordCounter of(Word word, long counter)
+ {
+ return new WordCounter(word.getUser(), word.getWord(), counter);
+ }
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
-import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
- properties = {
- "spring.main.allow-bean-definition-overriding=true",
- "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.add.type.headers=false",
- "spring.kafka.consumer.auto-offset-reset=earliest",
- "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
- "logging.level.root=WARN",
- "logging.level.de.juplo=DEBUG",
- "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.counter.commit-interval=100",
- "juplo.wordcount.counter.cache-max-bytes=0",
- "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
- "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
-@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
-@Slf4j
-public class CounterApplicationIT
-{
- public static final String TOPIC_IN = "in";
- public static final String TOPIC_OUT = "out";
-
- @Autowired
- Consumer consumer;
- @Autowired
- CounterStreamProcessor streamProcessor;
-
-
- @BeforeAll
- public static void testSendMessage(
- @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
- {
- TestData
- .getInputMessages()
- .forEach(kv ->
- {
- try
- {
- SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
- log.info(
- "Sent: {}={}, partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- });
- }
-
- @DisplayName("Await the expected number of messages")
- @Test
- public void testAwaitExpectedNumberOfMessagesForUsers()
- {
- await("Expected number of messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
- }
-
- @DisplayName("Await the expected output messages")
- @Test
- void testSendMessage()
- {
- await("Expected messages")
- .atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
- }
-
- @DisplayName("Await the expected final output messages")
- @Test
- public void testAwaitExpectedLastMessagesForUsers()
- {
- await("Expected final output messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
- }
-
- @DisplayName("Await the expected state in the state-store")
- @Test
- public void testAwaitExpectedState()
- {
- await("Expected state")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
- }
-
-
- static class Consumer
- {
- private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
-
- @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
- public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
- @Payload TestOutputWordCounter counter)
- {
- log.debug("Received message: {} -> {}", word, counter);
- received.add(word, counter);
- }
-
- synchronized void enforceAssertion(
- java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
- {
- assertion.accept(received);
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- Consumer consumer()
- {
- return new Consumer();
- }
-
- @Bean
- KeyValueBytesStoreSupplier storeSupplier()
- {
- return Stores.inMemoryKeyValueStore(STORE_NAME);
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
-
-
-@Slf4j
-public class CounterStreamProcessorTopologyTest
-{
- public static final String IN = "TEST-IN";
- public static final String OUT = "TEST-OUT";
-
-
- static TopologyTestDriver testDriver;
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
-
-
- @BeforeAll
- public static void setUpTestDriver()
- {
- Topology topology = CounterStreamProcessor.buildTopology(
- IN,
- OUT,
- Stores.inMemoryKeyValueStore(STORE_NAME));
-
- testDriver = new TopologyTestDriver(topology, serializationConfig());
-
- TestInputTopic<TestInputUser, TestInputWord> in =
- testDriver.createInputTopic(IN, serializer(), serializer());
- TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
- testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
-
- TestData
- .getInputMessages()
- .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
- receivedMessages = new LinkedMultiValueMap<>();
- out
- .readRecordsToList()
- .forEach(record -> receivedMessages.add(record.key(), record.value()));
- }
-
-
- @DisplayName("Assert the expected output messages")
- @Test
- public void testExpectedMessages()
- {
- TestData.assertExpectedMessages(receivedMessages);
- }
-
- @DisplayName("Assert the expected number of messages")
- @Test
- public void testExpectedNumberOfMessagesForWord()
- {
- TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
- }
-
- @DisplayName("Await the expected final output messages")
- @Test
- public void testExpectedLastMessagesForWord()
- {
- TestData.assertExpectedLastMessagesForWord(receivedMessages);
- }
-
- @DisplayName("Assert the expected state in the state-store")
- @Test
- public void testExpectedState()
- {
- KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
- TestData.assertExpectedState(store);
- }
-
- @AfterAll
- public static void tearDown()
- {
- testDriver.close();
- }
-
-
- private static JsonSerializer serializer()
- {
- return new JsonSerializer().noTypeInfo();
- }
-
- private static JsonDeserializer<TestOutputWord> keyDeserializer()
- {
- return deserializer(true);
- }
-
- private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
- {
- return deserializer(false);
- }
-
- private static <T> JsonDeserializer<T> deserializer(boolean isKey)
- {
- JsonDeserializer<T> deserializer = new JsonDeserializer<>();
- deserializer.configure(
- Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
- isKey);
- return deserializer;
- }
-
- private static String typeMappingsConfig()
- {
- return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
- static final String PETER = "peter";
- static final String KLAUS = "klaus";
-
- static final String WORD_HALLO = "Hallo";
- static final String WORD_MÜSCH = "Müsch";
- static final String WORD_WELT = "Welt";
- static final String WORD_S = "s";
- static final String WORD_BOÄH = "Boäh";
-
- static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
- static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
- static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
- static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
- static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
-
- private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
- {
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_HALLO)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_MÜSCH)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_WELT)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
- KeyValue.pair(
- TestInputUser.of(PETER),
- TestInputWord.of(PETER, WORD_BOÄH)),
- KeyValue.pair(
- TestInputUser.of(KLAUS),
- TestInputWord.of(KLAUS, WORD_S)),
- };
-
- static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
- {
- return Stream.of(TestData.INPUT_MESSAGES);
- }
-
- static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
- {
- expectedMessages().forEach(
- (word, counter) ->
- assertThat(receivedMessages.get(word))
- .containsExactlyElementsOf(counter));
- }
-
- static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
- {
- assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
- assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
- assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
- assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
- assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
- }
-
- private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
- {
- return messagesForUsers.get(word) == null
- ? 0
- : messagesForUsers.get(word).size();
- }
-
- static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
- {
- assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
- assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
- assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
- }
-
- private static Word wordOf(TestOutputWord testOutputWord)
- {
- Word word = new Word();
-
- word.setUser(testOutputWord.getUser());
- word.setWord(testOutputWord.getWord());
-
- return word;
- }
-
- static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
- {
- assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
- assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
- }
-
- private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
- Long counter)
- {
- TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
- word.getUser(),
- word.getWord(),
- counter);
- assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
- }
-
- private static void assertWordCountEqualsWordCountFromLastMessage(
- TestOutputWord word,
- TestOutputWordCounter counter)
- {
- assertThat(counter).isEqualTo(getLastMessageFor(word));
- }
-
- private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
- {
- return getLastMessageFor(word, expectedMessages());
- }
-
- private static TestOutputWordCounter getLastMessageFor(
- TestOutputWord user,
- MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
- {
- return messagesForWord
- .get(user)
- .stream()
- .reduce(null, (left, right) -> right);
- }
-
- private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
- {
- KeyValue.pair(
- PETER_HALLO,
- TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
- KeyValue.pair(
- KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
- KeyValue.pair(
- PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,1)),
- KeyValue.pair(
- KLAUS_MÜSCH,
- TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
- KeyValue.pair(
- KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,1)),
- KeyValue.pair(
- PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
- KeyValue.pair(
- PETER_WELT,
- TestOutputWordCounter.of(PETER, WORD_WELT,2)),
- KeyValue.pair(
- PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
- KeyValue.pair(
- KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,2)),
- KeyValue.pair(
- PETER_BOÄH,
- TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
- KeyValue.pair(
- KLAUS_S,
- TestOutputWordCounter.of(KLAUS, WORD_S,3)),
- };
-
- static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
- {
- MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
- Stream
- .of(EXPECTED_MESSAGES)
- .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
- return expectedMessages;
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
+import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.main.allow-bean-definition-overriding=true",
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
+ "logging.level.root=WARN",
+ "logging.level.de.juplo=DEBUG",
+ "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.counter.commit-interval=100",
+ "juplo.wordcount.counter.cache-max-bytes=0",
+ "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
+ "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
+@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
+@Slf4j
+public class CounterApplicationIT
+{
+ public static final String TOPIC_IN = "in";
+ public static final String TOPIC_OUT = "out";
+
+ @Autowired
+ Consumer consumer;
+ @Autowired
+ CounterStreamProcessor streamProcessor;
+
+
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<TestInputUser, TestInputWord> kafkaTemplate)
+ {
+ TestData
+ .getInputMessages()
+ .forEach(kv ->
+ {
+ try
+ {
+ SendResult<TestInputUser, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @DisplayName("Await the expected number of messages")
+ @Test
+ public void testAwaitExpectedNumberOfMessagesForUsers()
+ {
+ await("Expected number of messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected output messages")
+ @Test
+ void testSendMessage()
+ {
+ await("Expected messages")
+ .atMost(Duration.ofSeconds(10))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testAwaitExpectedLastMessagesForUsers()
+ {
+ await("Expected final output messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+ }
+
+
+ static class Consumer
+ {
+ private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
+
+ @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+ public synchronized void receive(
+ @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+ @Payload TestOutputWordCounter counter)
+ {
+ log.debug("Received message: {} -> {}", word, counter);
+ received.add(word, counter);
+ }
+
+ synchronized void enforceAssertion(
+ java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
+ {
+ assertion.accept(received);
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Consumer consumer()
+ {
+ return new Consumer();
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+
+import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+
+
+@Slf4j
+public class CounterStreamProcessorTopologyTest
+{
+ public static final String IN = "TEST-IN";
+ public static final String OUT = "TEST-OUT";
+
+
+ static TopologyTestDriver testDriver;
+ static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+
+
+ @BeforeAll
+ public static void setUpTestDriver()
+ {
+ Topology topology = CounterStreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore(STORE_NAME));
+
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+ TestInputTopic<TestInputUser, TestInputWord> in =
+ testDriver.createInputTopic(IN, serializer(), serializer());
+ TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
+ testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
+
+ TestData
+ .getInputMessages()
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+ receivedMessages = new LinkedMultiValueMap<>();
+ out
+ .readRecordsToList()
+ .forEach(record -> receivedMessages.add(record.key(), record.value()));
+ }
+
+
+ @DisplayName("Assert the expected output messages")
+ @Test
+ public void testExpectedMessages()
+ {
+ TestData.assertExpectedMessages(receivedMessages);
+ }
+
+ @DisplayName("Assert the expected number of messages")
+ @Test
+ public void testExpectedNumberOfMessagesForWord()
+ {
+ TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
+ }
+
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testExpectedLastMessagesForWord()
+ {
+ TestData.assertExpectedLastMessagesForWord(receivedMessages);
+ }
+
+ @DisplayName("Assert the expected state in the state-store")
+ @Test
+ public void testExpectedState()
+ {
+ KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
+ }
+
+ @AfterAll
+ public static void tearDown()
+ {
+ testDriver.close();
+ }
+
+
+ private static JsonSerializer serializer()
+ {
+ return new JsonSerializer().noTypeInfo();
+ }
+
+ private static JsonDeserializer<TestOutputWord> keyDeserializer()
+ {
+ return deserializer(true);
+ }
+
+ private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+ {
+ return deserializer(false);
+ }
+
+ private static <T> JsonDeserializer<T> deserializer(boolean isKey)
+ {
+ JsonDeserializer<T> deserializer = new JsonDeserializer<>();
+ deserializer.configure(
+ Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
+ isKey);
+ return deserializer;
+ }
+
+ private static String typeMappingsConfig()
+ {
+ return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import de.juplo.kafka.wordcount.splitter.TestInputUser;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static final String PETER = "peter";
+ static final String KLAUS = "klaus";
+
+ static final String WORD_HALLO = "Hallo";
+ static final String WORD_MÜSCH = "Müsch";
+ static final String WORD_WELT = "Welt";
+ static final String WORD_S = "s";
+ static final String WORD_BOÄH = "Boäh";
+
+ static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO);
+ static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT);
+ static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH);
+ static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH);
+ static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S);
+
+ private static final KeyValue<TestInputUser, TestInputWord>[] INPUT_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair(
+ TestInputUser.of(PETER),
+ TestInputWord.of(PETER, WORD_HALLO)),
+ KeyValue.pair(
+ TestInputUser.of(KLAUS),
+ TestInputWord.of(KLAUS, WORD_MÜSCH)),
+ KeyValue.pair(
+ TestInputUser.of(PETER),
+ TestInputWord.of(PETER, WORD_WELT)),
+ KeyValue.pair(
+ TestInputUser.of(KLAUS),
+ TestInputWord.of(KLAUS, WORD_MÜSCH)),
+ KeyValue.pair(
+ TestInputUser.of(KLAUS),
+ TestInputWord.of(KLAUS, WORD_S)),
+ KeyValue.pair(
+ TestInputUser.of(PETER),
+ TestInputWord.of(PETER, WORD_BOÄH)),
+ KeyValue.pair(
+ TestInputUser.of(PETER),
+ TestInputWord.of(PETER, WORD_WELT)),
+ KeyValue.pair(
+ TestInputUser.of(PETER),
+ TestInputWord.of(PETER, WORD_BOÄH)),
+ KeyValue.pair(
+ TestInputUser.of(KLAUS),
+ TestInputWord.of(KLAUS, WORD_S)),
+ KeyValue.pair(
+ TestInputUser.of(PETER),
+ TestInputWord.of(PETER, WORD_BOÄH)),
+ KeyValue.pair(
+ TestInputUser.of(KLAUS),
+ TestInputWord.of(KLAUS, WORD_S)),
+ };
+
+ static Stream<KeyValue<TestInputUser, TestInputWord>> getInputMessages()
+ {
+ return Stream.of(TestData.INPUT_MESSAGES);
+ }
+
+ static void assertExpectedMessages(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ {
+ expectedMessages().forEach(
+ (word, counter) ->
+ assertThat(receivedMessages.get(word))
+ .containsExactlyElementsOf(counter));
+ }
+
+ static void assertExpectedNumberOfMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ {
+ assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
+ assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
+ assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
+ assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
+ assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+ }
+
+ private static int countMessagesForWord(TestOutputWord word, MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForUsers)
+ {
+ return messagesForUsers.get(word) == null
+ ? 0
+ : messagesForUsers.get(word).size();
+ }
+
+ static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+ {
+ assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
+ assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
+ assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
+ assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
+ assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+ }
+
+ private static Word wordOf(TestOutputWord testOutputWord)
+ {
+ Word word = new Word();
+
+ word.setUser(testOutputWord.getUser());
+ word.setWord(testOutputWord.getWord());
+
+ return word;
+ }
+
+ static void assertExpectedLastMessagesForWord(MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages)
+ {
+ assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
+ assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+ }
+
+ private static void assertWordCountEqualsWordCountFromLastMessage(
+ TestOutputWord word,
+ Long counter)
+ {
+ TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of(
+ word.getUser(),
+ word.getWord(),
+ counter);
+ assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+ }
+
+ private static void assertWordCountEqualsWordCountFromLastMessage(
+ TestOutputWord word,
+ TestOutputWordCounter counter)
+ {
+ assertThat(counter).isEqualTo(getLastMessageFor(word));
+ }
+
+ private static TestOutputWordCounter getLastMessageFor(TestOutputWord word)
+ {
+ return getLastMessageFor(word, expectedMessages());
+ }
+
+ private static TestOutputWordCounter getLastMessageFor(
+ TestOutputWord user,
+ MultiValueMap<TestOutputWord, TestOutputWordCounter> messagesForWord)
+ {
+ return messagesForWord
+ .get(user)
+ .stream()
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static final KeyValue<TestOutputWord, TestOutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair(
+ PETER_HALLO,
+ TestOutputWordCounter.of(PETER, WORD_HALLO,1)),
+ KeyValue.pair(
+ KLAUS_MÜSCH,
+ TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+ KeyValue.pair(
+ PETER_WELT,
+ TestOutputWordCounter.of(PETER, WORD_WELT,1)),
+ KeyValue.pair(
+ KLAUS_MÜSCH,
+ TestOutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+ KeyValue.pair(
+ KLAUS_S,
+ TestOutputWordCounter.of(KLAUS, WORD_S,1)),
+ KeyValue.pair(
+ PETER_BOÄH,
+ TestOutputWordCounter.of(PETER, WORD_BOÄH,1)),
+ KeyValue.pair(
+ PETER_WELT,
+ TestOutputWordCounter.of(PETER, WORD_WELT,2)),
+ KeyValue.pair(
+ PETER_BOÄH,
+ TestOutputWordCounter.of(PETER, WORD_BOÄH,2)),
+ KeyValue.pair(
+ KLAUS_S,
+ TestOutputWordCounter.of(KLAUS, WORD_S,2)),
+ KeyValue.pair(
+ PETER_BOÄH,
+ TestOutputWordCounter.of(PETER, WORD_BOÄH,3)),
+ KeyValue.pair(
+ KLAUS_S,
+ TestOutputWordCounter.of(KLAUS, WORD_S,3)),
+ };
+
+ static MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages()
+ {
+ MultiValueMap<TestOutputWord, TestOutputWordCounter> expectedMessages = new LinkedMultiValueMap<>();
+ Stream
+ .of(EXPECTED_MESSAGES)
+ .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+ return expectedMessages;
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputUser
+{
+ String user;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.splitter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestInputWord
+{
+ String user;
+ String word;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestInputUser
-{
- String user;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.splitter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestInputWord
-{
- String user;
- String word;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWord
+{
+ String user;
+ String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestOutputWordCounter
+{
+ String user;
+ String word;
+ long counter;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWord
-{
- String user;
- String word;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestOutputWordCounter
-{
- String user;
- String word;
- long counter;
-}