--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@NoArgsConstructor
+@AllArgsConstructor(
+ staticName = "of",
+ access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Entry
+{
+ private String word;
+ private Long counter;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
+
+
+@NoArgsConstructor
+@AllArgsConstructor(
+ staticName = "of",
+ access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Key
+{
+ private String user;
+ private String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.*;
+
+import java.util.*;
+
+
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
+public class Ranking
+{
+ public final static int MAX_ENTRIES = 10;
+
+
+ private Entry[] entries = new Entry[0];
+
+ public Ranking add(Entry newEntry)
+ {
+ if (entries.length == 0)
+ {
+ entries = new Entry[1];
+ entries[0] = newEntry;
+ return this;
+ }
+
+ List<Entry> list = new LinkedList<>(Arrays.asList(entries));
+ int oldPosition = -1;
+ for (int i = 0; i < list.size(); i++)
+ {
+ Entry entry = list.get(i);
+
+ if (entry.getCounter() < newEntry.getCounter())
+ {
+ if (oldPosition > -1)
+ {
+ if (list.get(oldPosition).getCounter() > newEntry.getCounter())
+ {
+ throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+ }
+ else
+ {
+ // Entry for word already exists with the same counting! Nothing changed...
+ return this;
+ }
+ }
+
+ list.add(i, newEntry);
+ for (int j = i+1; j < list.size(); j++)
+ {
+ entry = list.get(j);
+ if(entry.getWord().equals(newEntry.getWord()))
+ {
+ list.remove(j);
+ break;
+ }
+ }
+ if (list.size() > MAX_ENTRIES)
+ {
+ list = list.subList(0, MAX_ENTRIES);
+ }
+ entries = list.toArray(num -> new Entry[num]);
+ return this;
+ }
+
+ if (entry.getWord().equals(newEntry.getWord()))
+ oldPosition = i;
+ }
+
+ if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
+ {
+ throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+ }
+
+ if (list.size() < MAX_ENTRIES)
+ {
+ list.add(newEntry);
+ entries = list.toArray(num -> new Entry[num]);
+ }
+
+ return this;
+ }
+
+ public Ranking validate() throws IllegalArgumentException
+ {
+ if (this.entries.length > MAX_ENTRIES)
+ throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
+
+ Set<String> seenWords = new HashSet<>();
+ long lowesCounting = Long.MAX_VALUE;
+
+ for (int i=0; i<this.entries.length; i++)
+ {
+ Entry entry = this.entries[i];
+
+ if (seenWords.contains(entry.getWord()))
+ throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+ if (entry.getCounter() > lowesCounting)
+ throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
+
+ seenWords.add(entry.getWord());
+ lowesCounting = entry.getCounter();
+ }
+
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null)
+ return false;
+ if (!(o instanceof Ranking))
+ return false;
+
+ Ranking other = (Ranking)o;
+
+ if (other.entries.length != entries.length)
+ return false;
+
+ if (entries.length == 0)
+ return true;
+
+ int i = 0;
+ Set<String> myWordsWithCurrentCount = new HashSet<>();
+ Set<String> otherWordsWithCurrentCount = new HashSet<>();
+ Entry myEntry = entries[i];
+ long currentCount = myEntry.getCounter();
+ myWordsWithCurrentCount.add(myEntry.getWord());
+ while (true)
+ {
+ Entry otherEntry = other.entries[i];
+ if (otherEntry.getCounter() != currentCount)
+ return false;
+ otherWordsWithCurrentCount.add(otherEntry.getWord());
+ if (++i >= entries.length)
+ return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
+ myEntry = entries[i];
+ if (myEntry.getCounter() != currentCount)
+ {
+ if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
+ return false;
+ currentCount = myEntry.getCounter();
+ myWordsWithCurrentCount.clear();
+ otherWordsWithCurrentCount.clear();
+ }
+ myWordsWithCurrentCount.add(myEntry.getWord());
+ }
+ }
+
+ public static Ranking of(Entry... entries)
+ {
+ Ranking ranking = new Ranking(entries);
+ ranking.validate();
+ return ranking;
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+@SpringBootApplication
+public class Top10Application
+{
+ public static void main(String[] args)
+ {
+ SpringApplication.run(Top10Application.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@Slf4j
+public class Top10ApplicationConfiguration
+{
+ @Bean
+ public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+
+ props.putAll(serializationConfig());
+
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+
+ if (properties.getCommitInterval() != null)
+ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
+ if (properties.getCacheMaxBytes() != null)
+ props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
+
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return props;
+ }
+
+ static Properties serializationConfig()
+ {
+ Properties props = new Properties();
+
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
+ props.put(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "word:" + Key.class.getName() + "," +
+ "counter:" + Entry.class.getName() + "," +
+ "user:" + User.class.getName() + "," +
+ "ranking:" + Ranking.class.getName());
+
+ return props;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public Top10StreamProcessor streamProcessor(
+ Top10ApplicationProperties applicationProperties,
+ Properties streamProcessorProperties,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ConfigurableApplicationContext context)
+ {
+ Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+ applicationProperties.getInputTopic(),
+ applicationProperties.getOutputTopic(),
+ streamProcessorProperties,
+ storeSupplier);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+
+ return streamProcessor;
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.persistentKeyValueStore(STORE_NAME);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.top10")
+@Getter
+@Setter
+@ToString
+public class Top10ApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String applicationId = "top10";
+ private String inputTopic = "countings";
+ private String outputTopic = "top10";
+ private Integer commitInterval;
+ private Integer cacheMaxBytes;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Properties;
+
+
+@Slf4j
+public class Top10StreamProcessor
+{
+ public static final String STORE_NAME= "top10";
+
+ public final KafkaStreams streams;
+
+
+ public Top10StreamProcessor(
+ String inputTopic,
+ String outputTopic,
+ Properties props,
+ KeyValueBytesStoreSupplier storeSupplier)
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic,
+ storeSupplier);
+
+ streams = new KafkaStreams(topology, props);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier)
+ {
+ StreamsBuilder builder = new StreamsBuilder();
+
+ builder
+ .<Key, Entry>stream(inputTopic)
+ .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
+ .groupByKey()
+ .aggregate(
+ () -> new Ranking(),
+ (user, entry, ranking) -> ranking.add(entry),
+ Materialized.as(storeSupplier))
+ .toStream()
+ .to(outputTopic);
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
+ }
+
+ ReadOnlyKeyValueStore<User, Ranking> getStore()
+ {
+ return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+ }
+
+ public void start()
+ {
+ log.info("Starting Stream-Processor");
+ streams.start();
+ }
+
+ public void stop()
+ {
+ log.info("Stopping Stream-Processor");
+ streams.close();
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+ String user;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(
- staticName = "of",
- access = AccessLevel.PACKAGE)
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Entry
-{
- private String word;
- private Long counter;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.*;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(
- staticName = "of",
- access = AccessLevel.PACKAGE)
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Key
-{
- private String user;
- private String word;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.*;
-
-import java.util.*;
-
-
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-@NoArgsConstructor
-@Data
-public class Ranking
-{
- public final static int MAX_ENTRIES = 10;
-
-
- private Entry[] entries = new Entry[0];
-
- public Ranking add(Entry newEntry)
- {
- if (entries.length == 0)
- {
- entries = new Entry[1];
- entries[0] = newEntry;
- return this;
- }
-
- List<Entry> list = new LinkedList<>(Arrays.asList(entries));
- int oldPosition = -1;
- for (int i = 0; i < list.size(); i++)
- {
- Entry entry = list.get(i);
-
- if (entry.getCounter() < newEntry.getCounter())
- {
- if (oldPosition > -1)
- {
- if (list.get(oldPosition).getCounter() > newEntry.getCounter())
- {
- throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
- }
- else
- {
- // Entry for word already exists with the same counting! Nothing changed...
- return this;
- }
- }
-
- list.add(i, newEntry);
- for (int j = i+1; j < list.size(); j++)
- {
- entry = list.get(j);
- if(entry.getWord().equals(newEntry.getWord()))
- {
- list.remove(j);
- break;
- }
- }
- if (list.size() > MAX_ENTRIES)
- {
- list = list.subList(0, MAX_ENTRIES);
- }
- entries = list.toArray(num -> new Entry[num]);
- return this;
- }
-
- if (entry.getWord().equals(newEntry.getWord()))
- oldPosition = i;
- }
-
- if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
- {
- throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
- }
-
- if (list.size() < MAX_ENTRIES)
- {
- list.add(newEntry);
- entries = list.toArray(num -> new Entry[num]);
- }
-
- return this;
- }
-
- public Ranking validate() throws IllegalArgumentException
- {
- if (this.entries.length > MAX_ENTRIES)
- throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
-
- Set<String> seenWords = new HashSet<>();
- long lowesCounting = Long.MAX_VALUE;
-
- for (int i=0; i<this.entries.length; i++)
- {
- Entry entry = this.entries[i];
-
- if (seenWords.contains(entry.getWord()))
- throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
- if (entry.getCounter() > lowesCounting)
- throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
-
- seenWords.add(entry.getWord());
- lowesCounting = entry.getCounter();
- }
-
- return this;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if (this == o)
- return true;
- if (o == null)
- return false;
- if (!(o instanceof Ranking))
- return false;
-
- Ranking other = (Ranking)o;
-
- if (other.entries.length != entries.length)
- return false;
-
- if (entries.length == 0)
- return true;
-
- int i = 0;
- Set<String> myWordsWithCurrentCount = new HashSet<>();
- Set<String> otherWordsWithCurrentCount = new HashSet<>();
- Entry myEntry = entries[i];
- long currentCount = myEntry.getCounter();
- myWordsWithCurrentCount.add(myEntry.getWord());
- while (true)
- {
- Entry otherEntry = other.entries[i];
- if (otherEntry.getCounter() != currentCount)
- return false;
- otherWordsWithCurrentCount.add(otherEntry.getWord());
- if (++i >= entries.length)
- return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
- myEntry = entries[i];
- if (myEntry.getCounter() != currentCount)
- {
- if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
- return false;
- currentCount = myEntry.getCounter();
- myWordsWithCurrentCount.clear();
- otherWordsWithCurrentCount.clear();
- }
- myWordsWithCurrentCount.add(myEntry.getWord());
- }
- }
-
- public static Ranking of(Entry... entries)
- {
- Ranking ranking = new Ranking(entries);
- ranking.validate();
- return ranking;
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-
-
-@SpringBootApplication
-public class Top10Application
-{
- public static void main(String[] args)
- {
- SpringApplication.run(Top10Application.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerde;
-
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-
-
-@Configuration
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
-@Slf4j
-public class Top10ApplicationConfiguration
-{
- @Bean
- public Properties streamProcessorProperties(Top10ApplicationProperties properties)
- {
- Properties props = new Properties();
-
- props.putAll(serializationConfig());
-
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-
- if (properties.getCommitInterval() != null)
- props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
- if (properties.getCacheMaxBytes() != null)
- props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, properties.getCacheMaxBytes());
-
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
- return props;
- }
-
- static Properties serializationConfig()
- {
- Properties props = new Properties();
-
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
- props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
- props.put(
- JsonDeserializer.TYPE_MAPPINGS,
- "word:" + Key.class.getName() + "," +
- "counter:" + Entry.class.getName() + "," +
- "user:" + User.class.getName() + "," +
- "ranking:" + Ranking.class.getName());
-
- return props;
- }
-
- @Bean(initMethod = "start", destroyMethod = "stop")
- public Top10StreamProcessor streamProcessor(
- Top10ApplicationProperties applicationProperties,
- Properties streamProcessorProperties,
- KeyValueBytesStoreSupplier storeSupplier,
- ConfigurableApplicationContext context)
- {
- Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
- applicationProperties.getInputTopic(),
- applicationProperties.getOutputTopic(),
- streamProcessorProperties,
- storeSupplier);
-
- streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
-
- return streamProcessor;
- }
-
- @Bean
- public KeyValueBytesStoreSupplier storeSupplier()
- {
- return Stores.persistentKeyValueStore(STORE_NAME);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.top10")
-@Getter
-@Setter
-@ToString
-public class Top10ApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String applicationId = "top10";
- private String inputTopic = "countings";
- private String outputTopic = "top10";
- private Integer commitInterval;
- private Integer cacheMaxBytes;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-
-import java.util.Properties;
-
-
-@Slf4j
-public class Top10StreamProcessor
-{
- public static final String STORE_NAME= "top10";
-
- public final KafkaStreams streams;
-
-
- public Top10StreamProcessor(
- String inputTopic,
- String outputTopic,
- Properties props,
- KeyValueBytesStoreSupplier storeSupplier)
- {
- Topology topology = Top10StreamProcessor.buildTopology(
- inputTopic,
- outputTopic,
- storeSupplier);
-
- streams = new KafkaStreams(topology, props);
- }
-
- static Topology buildTopology(
- String inputTopic,
- String outputTopic,
- KeyValueBytesStoreSupplier storeSupplier)
- {
- StreamsBuilder builder = new StreamsBuilder();
-
- builder
- .<Key, Entry>stream(inputTopic)
- .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
- .groupByKey()
- .aggregate(
- () -> new Ranking(),
- (user, entry, ranking) -> ranking.add(entry),
- Materialized.as(storeSupplier))
- .toStream()
- .to(outputTopic);
-
- Topology topology = builder.build();
- log.info("\n\n{}", topology.describe());
-
- return topology;
- }
-
- ReadOnlyKeyValueStore<User, Ranking> getStore()
- {
- return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
- }
-
- public void start()
- {
- log.info("Starting Stream-Processor");
- streams.start();
- }
-
- public void stop()
- {
- log.info("Stopping Stream-Processor");
- streams.close();
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-@Data
-public class User
-{
- String user;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@Data
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-public class TestCounter
-{
- String user;
- String word;
- long counter;
-
- public static TestCounter of(TestWord word, long counter)
- {
- return new TestCounter(word.getUser(), word.getWord(), counter);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.counter;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class TestWord
-{
- private String user;
- private String word;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestCounter
+{
+ String user;
+ String word;
+ long counter;
+
+ public static TestCounter of(TestWord word, long counter)
+ {
+ return new TestCounter(word.getUser(), word.getWord(), counter);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TestWord
+{
+ private String user;
+ private String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestEntry
+{
+ String word;
+ long counter;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.Entry;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
+public class TestRanking
+{
+ private TestEntry[] entries;
+
+ public static TestRanking of(TestEntry... entries)
+ {
+ return new TestRanking(entries);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUser
+{
+ String user;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-@Data
-public class TestEntry
-{
- String word;
- long counter;
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import de.juplo.kafka.wordcount.top10.Entry;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
-@NoArgsConstructor
-@Data
-public class TestRanking
-{
- private TestEntry[] entries;
-
- public static TestRanking of(TestEntry... entries)
- {
- return new TestRanking(entries);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.query;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@AllArgsConstructor(staticName = "of")
-@NoArgsConstructor
-@Data
-public class TestUser
-{
- String user;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+
+
+public class RankingTest
+{
+ @DisplayName("A newly created instance is empty")
+ @Test
+ public void testNewRankingIsEmpty()
+ {
+ Ranking ranking = new Ranking();
+ assertThat(ranking.getEntries()).isEmpty();
+ }
+
+ @DisplayName("An instance that was build from an empty ranking is empty")
+ @Test
+ public void testRankingOfYieldsExpectedResultForEmptyList()
+ {
+ Ranking ranking = new Ranking();
+ assertThat(ranking.getEntries()).isEmpty();
+ }
+
+ @DisplayName("An instance that was build from a valid ranking contains the expected entries")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testRankingOfYieldsExpectedResultsForValidRankings(List<Entry> entryList)
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList);
+ }
+
+ @DisplayName("The builder fails for invalid rankings")
+ @ParameterizedTest
+ @MethodSource("invalidRankingsProvider")
+ public void testRankingOfThrowsExceptionForInvalidRankings(List<Entry> entryList)
+ {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> Ranking.of(toArray(entryList)));
+ }
+
+ @DisplayName("Adding a new word with highest ranking, pushes all other words down")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testAddingNewWordWithHighestRanking(List<Entry> entryList)
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ Entry newEntry = Entry.of("NEW!", rankingForPosition(-1));
+ ranking.add(newEntry);
+ assertThat(ranking.getEntries()[0]).isEqualTo(newEntry);
+ for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+ {
+ assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+ }
+ }
+
+ @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testAddingNewWordWithExistingRanking(List<Entry> entryList)
+ {
+ for (int position = 0; position < entryList.size(); position++ )
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ Entry newEntry = Entry.of("NEW!", rankingForPosition(position));
+ ranking.add(newEntry);
+ for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+ {
+ if (i < position)
+ {
+ assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+ }
+ if (i == position)
+ {
+ assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+ assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry);
+ }
+ if (i > position)
+ {
+ assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+ }
+ }
+ }
+ }
+
+ @DisplayName("Adding a highest ranking for an existing word shifts it to the first place")
+ @ParameterizedTest
+ @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+ public void testAddingExistingWordWithHighestRanking(int position)
+ {
+ Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+ String word = wordForPosition(position);
+ Entry highestEntry = Entry.of(word, 100l);
+ ranking.add(highestEntry);
+ List<Entry> expectedEntries = Stream
+ .concat(
+ Stream.of(highestEntry),
+ VALID_RANKINGS[0]
+ .stream()
+ .filter(entry -> !entry.getWord().equals(word)))
+ .toList();
+ assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
+ }
+
+ @DisplayName("Adding an existing word with unchanged ranking changes nothing")
+ @ParameterizedTest
+ @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+ public void testAddingExistingWordWithUnchangedRanking(int position)
+ {
+ Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+ Entry unchangedEntry = Entry.of(
+ wordForPosition(position),
+ rankingForPosition(position));
+ ranking.add(unchangedEntry);
+ assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]);
+ }
+
+ @DisplayName("Adding an existing word with a lower ranking fails")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testAddingExistingWordWithLowerRankingFails(List<Entry> entryList)
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ entryList.forEach(entry ->
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+ }
+
+ @DisplayName("Identical rankings are considered equal")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testIdenticalRankingsAreConsideredEaqual(List<Entry> entryList)
+ {
+ assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList)));
+ }
+
+ @DisplayName("Two empty rankings are considered equal")
+ @Test
+ public void testTwoEmptyRankingsAreConsideredEaqual()
+ {
+ assertThat(Ranking.of()).isEqualTo(Ranking.of());
+ }
+
+ @DisplayName("A changed ranking is not considered equal to its unchanged counter-part")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testChangedRankingsDoNotEaqualUnchangedOne(List<Entry> entryList)
+ {
+ Ranking changed = Ranking.of(toArray(entryList));
+ changed.add(Entry.of("devilish", 666l));
+ assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList)));
+ }
+
+ @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ")
+ @Test
+ public void testRankingWithDifferentOrderForSameCountAreEqual()
+ {
+ assertThat(
+ Ranking.of(
+ Entry.of("a1",10l),
+ Entry.of("a2",10l),
+ Entry.of("b1", 9l),
+ Entry.of("b2",9l),
+ Entry.of("c1", 8l),
+ Entry.of("c2", 8l)))
+ .isEqualTo(Ranking.of(
+ Entry.of("a2",10l),
+ Entry.of("a1",10l),
+ Entry.of("b2", 9l),
+ Entry.of("b1",9l),
+ Entry.of("c2", 8l),
+ Entry.of("c1", 8l)));
+ }
+
+
+ Entry[] toArray(List<Entry> entryList)
+ {
+ return entryList.toArray(size -> new Entry[size]);
+ }
+
+ static String wordForPosition(int position)
+ {
+ return Integer.toString(position+1);
+ }
+
+ static long rankingForPosition(int position)
+ {
+ return (long)Ranking.MAX_ENTRIES * 2 - position;
+ }
+
+ static Stream<List<Entry>> validRankingsProvider()
+ {
+ return Stream.of(VALID_RANKINGS);
+ }
+
+ static Stream<List<Entry>> invalidRankingsProvider()
+ {
+ return Stream.of(INVALID_RANKINGS);
+ }
+
+ static String[] WORDS = new String[Ranking.MAX_ENTRIES];
+ static List<Entry>[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES];
+
+ static
+ {
+ for (int i = 0; i < Ranking.MAX_ENTRIES; i++)
+ {
+ List<Entry> ranking = new LinkedList<>();
+ String word = null;
+ for (int position = 0; position <= i; position++)
+ {
+ word = wordForPosition(position);
+ Entry entry = Entry.of(word, rankingForPosition(position));
+ ranking.add(entry);
+ }
+ WORDS[i] = word;
+ VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking;
+ }
+ }
+
+ static List<Entry>[] INVALID_RANKINGS = new List[] {
+ List.of(
+ Entry.of("Platz eins", 1l),
+ Entry.of("Platz zwei", 2l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz eins", 1l)),
+ List.of(
+ Entry.of("Platz eins", 11l),
+ Entry.of("Platz eins", 1l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz eins", 11111111l),
+ Entry.of("Platz zwei", 2222222l),
+ Entry.of("Platz fünf", 555555l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz drei", 33333333l),
+ Entry.of("Platz vier", 4444444l),
+ Entry.of("Platz eins", 111111l),
+ Entry.of("Platz sechs", 66666l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz drei", 33333333l),
+ Entry.of("Platz vier", 4444444l),
+ Entry.of("Platz fünf", 555555l),
+ Entry.of("Platz sechs", 66666l),
+ Entry.of("Platz eins", 1l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz drei", 33333333l),
+ Entry.of("Platz vier", 4444444l),
+ Entry.of("Platz fünf", 555555l),
+ Entry.of("Platz sechs", 66666l),
+ Entry.of("Platz sieben", 7777l),
+ Entry.of("Platz acht", 888l),
+ Entry.of("Platz neun", 99l),
+ Entry.of("Platz 10", 6l),
+ Entry.of("Platz 11", 3l))};
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+ "logging.level.root=WARN",
+ "logging.level.de.juplo=DEBUG",
+ "logging.level.org.apache.kafka.clients=INFO",
+ "logging.level.org.apache.kafka.streams=INFO",
+ "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.top10.commit-interval=100",
+ "juplo.wordcount.top10.cacheMaxBytes=0",
+ "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
+ "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+@Slf4j
+public class Top10ApplicationIT
+{
+ public static final String TOPIC_IN = "in";
+ public static final String TOPIC_OUT = "out";
+
+ @Autowired
+ Consumer consumer;
+ @Autowired
+ Top10StreamProcessor streamProcessor;
+
+
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+ {
+ TestData
+ .getInputMessages()
+ .forEach(kv ->
+ {
+ try
+ {
+ SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+ }
+
+ @DisplayName("Await the expected output messages")
+ @Test
+ @Disabled
+ public void testAwaitExpectedMessages()
+ {
+ await("Expected messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected number of messages")
+ @Test
+ public void testAwaitExpectedNumberOfMessagesForUsers()
+ {
+ await("Expected number of messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testAwaitExpectedLastMessagesForUsers()
+ {
+ await("Expected final output messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
+ }
+
+
+ static class Consumer
+ {
+ private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
+
+ @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+ public synchronized void receive(
+ @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
+ @Payload TestRanking ranking)
+ {
+ log.debug("Received message: {} -> {}", user, ranking);
+ received.add(user, ranking);
+ }
+
+ synchronized void enforceAssertion(
+ java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
+ {
+ assertion.accept(received);
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Consumer consumer()
+ {
+ return new Consumer();
+ }
+
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+
+import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+ public static final String IN = "TEST-IN";
+ public static final String OUT = "TEST-OUT";
+ public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+ TopologyTestDriver testDriver;
+ TestInputTopic<TestWord, TestCounter> in;
+ TestOutputTopic<TestUser, TestRanking> out;
+
+
+ @BeforeEach
+ public void setUp()
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore(STORE_NAME));
+
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+ in = testDriver.createInputTopic(
+ IN,
+ jsonSerializer(TestWord.class, true),
+ jsonSerializer(TestCounter.class,false));
+
+ out = testDriver.createOutputTopic(
+ OUT,
+ new JsonDeserializer()
+ .copyWithType(TestUser.class)
+ .ignoreTypeHeaders(),
+ new JsonDeserializer()
+ .copyWithType(TestRanking.class)
+ .ignoreTypeHeaders());
+
+ }
+
+
+ @Test
+ public void test()
+ {
+ TestData
+ .getInputMessages()
+ .forEach(kv -> in.pipeInput(kv.key, kv.value));
+
+ MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
+ out
+ .readRecordsToList()
+ .forEach(record -> receivedMessages.add(record.key(), record.value()));
+
+ TestData.assertExpectedMessages(receivedMessages);
+
+ TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
+ TestData.assertExpectedLastMessagesForUsers(receivedMessages);
+
+ KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ testDriver.close();
+ }
+
+ private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+ {
+ JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+ jsonSerializer.configure(
+ Map.of(
+ JsonSerializer.TYPE_MAPPINGS,
+ "word:" + TestWord.class.getName() + "," +
+ "counter:" + TestCounter.class.getName()),
+ isKey);
+ return jsonSerializer;
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.query.TestEntry;
+import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static final TestUser PETER = TestUser.of("peter");
+ static final TestUser KLAUS = TestUser.of("klaus");
+
+ static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
+ {
+ return Stream.of(INPUT_MESSAGES);
+ }
+
+ private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+ {
+ new KeyValue<>(
+ TestWord.of(PETER.getUser(),"Hallo"),
+ TestCounter.of(PETER.getUser(),"Hallo",1)),
+ new KeyValue<>(
+ TestWord.of(KLAUS.getUser(),"Müsch"),
+ TestCounter.of(KLAUS.getUser(),"Müsch",1)),
+ new KeyValue<>(
+ TestWord.of(PETER.getUser(),"Welt"),
+ TestCounter.of(PETER.getUser(),"Welt",1)),
+ new KeyValue<>(
+ TestWord.of(KLAUS.getUser(),"Müsch"),
+ TestCounter.of(KLAUS.getUser(),"Müsch",2)),
+ new KeyValue<>(
+ TestWord.of(KLAUS.getUser(),"s"),
+ TestCounter.of(KLAUS.getUser(),"s",1)),
+ new KeyValue<>(
+ TestWord.of(PETER.getUser(),"Boäh"),
+ TestCounter.of(PETER.getUser(),"Boäh",1)),
+ new KeyValue<>(
+ TestWord.of(PETER.getUser(),"Welt"),
+ TestCounter.of(PETER.getUser(),"Welt",2)),
+ new KeyValue<>(
+ TestWord.of(PETER.getUser(),"Boäh"),
+ TestCounter.of(PETER.getUser(),"Boäh",2)),
+ new KeyValue<>(
+ TestWord.of(KLAUS.getUser(),"s"),
+ TestCounter.of(KLAUS.getUser(),"s",2)),
+ new KeyValue<>(
+ TestWord.of(PETER.getUser(),"Boäh"),
+ TestCounter.of(PETER.getUser(),"Boäh",3)),
+ new KeyValue<>(
+ TestWord.of(KLAUS.getUser(),"s"),
+ TestCounter.of(KLAUS.getUser(),"s",3)),
+ };
+
+ static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
+ {
+ expectedMessages().forEach(
+ (user, rankings) ->
+ assertThat(receivedMessages.get(user))
+ .containsExactlyElementsOf(rankings));
+ }
+
+ static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
+ }
+
+ private static User userOf(TestUser user)
+ {
+ return User.of(user.getUser());
+ }
+
+ static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+ {
+ assertThat(countMessagesForUser(PETER, receivedMessages));
+ assertThat(countMessagesForUser(KLAUS, receivedMessages));
+ }
+
+ private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+ {
+ return messagesForUsers.get(user) == null
+ ? 0
+ : messagesForUsers.get(user).size();
+ }
+
+
+ static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
+ {
+ TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
+ assertRankingEqualsRankingFromLastMessage(user, testRanking);
+ }
+
+ private static TestEntry[] testEntriesOf(Entry... entries)
+ {
+ return Arrays
+ .stream(entries)
+ .map(entry -> TestEntry.of(
+ entry.getWord(),
+ entry.getCounter() == null
+ ? -1l
+ : entry.getCounter()))
+ .toArray(size -> new TestEntry[size]);
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
+ {
+ assertThat(ranking).isEqualTo(getLastMessageFor(user));
+ }
+
+ private static TestRanking getLastMessageFor(TestUser user)
+ {
+ return getLastMessageFor(user, expectedMessages());
+ }
+
+ private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
+ {
+ return messagesForUsers
+ .get(user)
+ .stream()
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair( // 0
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 1
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 1l))),
+ KeyValue.pair( // 2
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l))),
+ KeyValue.pair( // 3
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l))),
+ KeyValue.pair( // 4
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 1l))),
+ KeyValue.pair( // 5
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 6
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 7
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Boäh", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 8
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 2l))),
+ KeyValue.pair( // 9
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Boäh", 3l),
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 10
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("s", 3l),
+ TestEntry.of("Müsch", 2l))),
+ };
+
+ private static MultiValueMap<TestUser, TestRanking> expectedMessages()
+ {
+ MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
+ Stream
+ .of(EXPECTED_MESSAGES)
+ .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+ return expectedMessages;
+ }
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
-
-
-public class RankingTest
-{
- @DisplayName("A newly created instance is empty")
- @Test
- public void testNewRankingIsEmpty()
- {
- Ranking ranking = new Ranking();
- assertThat(ranking.getEntries()).isEmpty();
- }
-
- @DisplayName("An instance that was build from an empty ranking is empty")
- @Test
- public void testRankingOfYieldsExpectedResultForEmptyList()
- {
- Ranking ranking = new Ranking();
- assertThat(ranking.getEntries()).isEmpty();
- }
-
- @DisplayName("An instance that was build from a valid ranking contains the expected entries")
- @ParameterizedTest
- @MethodSource("validRankingsProvider")
- public void testRankingOfYieldsExpectedResultsForValidRankings(List<Entry> entryList)
- {
- Ranking ranking = Ranking.of(toArray(entryList));
- assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList);
- }
-
- @DisplayName("The builder fails for invalid rankings")
- @ParameterizedTest
- @MethodSource("invalidRankingsProvider")
- public void testRankingOfThrowsExceptionForInvalidRankings(List<Entry> entryList)
- {
- assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> Ranking.of(toArray(entryList)));
- }
-
- @DisplayName("Adding a new word with highest ranking, pushes all other words down")
- @ParameterizedTest
- @MethodSource("validRankingsProvider")
- public void testAddingNewWordWithHighestRanking(List<Entry> entryList)
- {
- Ranking ranking = Ranking.of(toArray(entryList));
- Entry newEntry = Entry.of("NEW!", rankingForPosition(-1));
- ranking.add(newEntry);
- assertThat(ranking.getEntries()[0]).isEqualTo(newEntry);
- for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
- {
- assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
- }
- }
-
- @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down")
- @ParameterizedTest
- @MethodSource("validRankingsProvider")
- public void testAddingNewWordWithExistingRanking(List<Entry> entryList)
- {
- for (int position = 0; position < entryList.size(); position++ )
- {
- Ranking ranking = Ranking.of(toArray(entryList));
- Entry newEntry = Entry.of("NEW!", rankingForPosition(position));
- ranking.add(newEntry);
- for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
- {
- if (i < position)
- {
- assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
- }
- if (i == position)
- {
- assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
- assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry);
- }
- if (i > position)
- {
- assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
- }
- }
- }
- }
-
- @DisplayName("Adding a highest ranking for an existing word shifts it to the first place")
- @ParameterizedTest
- @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
- public void testAddingExistingWordWithHighestRanking(int position)
- {
- Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
- String word = wordForPosition(position);
- Entry highestEntry = Entry.of(word, 100l);
- ranking.add(highestEntry);
- List<Entry> expectedEntries = Stream
- .concat(
- Stream.of(highestEntry),
- VALID_RANKINGS[0]
- .stream()
- .filter(entry -> !entry.getWord().equals(word)))
- .toList();
- assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
- }
-
- @DisplayName("Adding an existing word with unchanged ranking changes nothing")
- @ParameterizedTest
- @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
- public void testAddingExistingWordWithUnchangedRanking(int position)
- {
- Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
- Entry unchangedEntry = Entry.of(
- wordForPosition(position),
- rankingForPosition(position));
- ranking.add(unchangedEntry);
- assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]);
- }
-
- @DisplayName("Adding an existing word with a lower ranking fails")
- @ParameterizedTest
- @MethodSource("validRankingsProvider")
- public void testAddingExistingWordWithLowerRankingFails(List<Entry> entryList)
- {
- Ranking ranking = Ranking.of(toArray(entryList));
- entryList.forEach(entry ->
- assertThatExceptionOfType(IllegalArgumentException.class)
- .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
- }
-
- @DisplayName("Identical rankings are considered equal")
- @ParameterizedTest
- @MethodSource("validRankingsProvider")
- public void testIdenticalRankingsAreConsideredEaqual(List<Entry> entryList)
- {
- assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList)));
- }
-
- @DisplayName("Two empty rankings are considered equal")
- @Test
- public void testTwoEmptyRankingsAreConsideredEaqual()
- {
- assertThat(Ranking.of()).isEqualTo(Ranking.of());
- }
-
- @DisplayName("A changed ranking is not considered equal to its unchanged counter-part")
- @ParameterizedTest
- @MethodSource("validRankingsProvider")
- public void testChangedRankingsDoNotEaqualUnchangedOne(List<Entry> entryList)
- {
- Ranking changed = Ranking.of(toArray(entryList));
- changed.add(Entry.of("devilish", 666l));
- assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList)));
- }
-
- @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ")
- @Test
- public void testRankingWithDifferentOrderForSameCountAreEqual()
- {
- assertThat(
- Ranking.of(
- Entry.of("a1",10l),
- Entry.of("a2",10l),
- Entry.of("b1", 9l),
- Entry.of("b2",9l),
- Entry.of("c1", 8l),
- Entry.of("c2", 8l)))
- .isEqualTo(Ranking.of(
- Entry.of("a2",10l),
- Entry.of("a1",10l),
- Entry.of("b2", 9l),
- Entry.of("b1",9l),
- Entry.of("c2", 8l),
- Entry.of("c1", 8l)));
- }
-
-
- Entry[] toArray(List<Entry> entryList)
- {
- return entryList.toArray(size -> new Entry[size]);
- }
-
- static String wordForPosition(int position)
- {
- return Integer.toString(position+1);
- }
-
- static long rankingForPosition(int position)
- {
- return (long)Ranking.MAX_ENTRIES * 2 - position;
- }
-
- static Stream<List<Entry>> validRankingsProvider()
- {
- return Stream.of(VALID_RANKINGS);
- }
-
- static Stream<List<Entry>> invalidRankingsProvider()
- {
- return Stream.of(INVALID_RANKINGS);
- }
-
- static String[] WORDS = new String[Ranking.MAX_ENTRIES];
- static List<Entry>[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES];
-
- static
- {
- for (int i = 0; i < Ranking.MAX_ENTRIES; i++)
- {
- List<Entry> ranking = new LinkedList<>();
- String word = null;
- for (int position = 0; position <= i; position++)
- {
- word = wordForPosition(position);
- Entry entry = Entry.of(word, rankingForPosition(position));
- ranking.add(entry);
- }
- WORDS[i] = word;
- VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking;
- }
- }
-
- static List<Entry>[] INVALID_RANKINGS = new List[] {
- List.of(
- Entry.of("Platz eins", 1l),
- Entry.of("Platz zwei", 2l)),
- List.of(
- Entry.of("Platz eins", 1111111111l),
- Entry.of("Platz zwei", 222222222l),
- Entry.of("Platz eins", 1l)),
- List.of(
- Entry.of("Platz eins", 11l),
- Entry.of("Platz eins", 1l)),
- List.of(
- Entry.of("Platz eins", 1111111111l),
- Entry.of("Platz zwei", 222222222l),
- Entry.of("Platz eins", 11111111l),
- Entry.of("Platz zwei", 2222222l),
- Entry.of("Platz fünf", 555555l)),
- List.of(
- Entry.of("Platz eins", 1111111111l),
- Entry.of("Platz zwei", 222222222l),
- Entry.of("Platz drei", 33333333l),
- Entry.of("Platz vier", 4444444l),
- Entry.of("Platz eins", 111111l),
- Entry.of("Platz sechs", 66666l)),
- List.of(
- Entry.of("Platz eins", 1111111111l),
- Entry.of("Platz zwei", 222222222l),
- Entry.of("Platz drei", 33333333l),
- Entry.of("Platz vier", 4444444l),
- Entry.of("Platz fünf", 555555l),
- Entry.of("Platz sechs", 66666l),
- Entry.of("Platz eins", 1l)),
- List.of(
- Entry.of("Platz eins", 1111111111l),
- Entry.of("Platz zwei", 222222222l),
- Entry.of("Platz drei", 33333333l),
- Entry.of("Platz vier", 4444444l),
- Entry.of("Platz fünf", 555555l),
- Entry.of("Platz sechs", 66666l),
- Entry.of("Platz sieben", 7777l),
- Entry.of("Platz acht", 888l),
- Entry.of("Platz neun", 99l),
- Entry.of("Platz 10", 6l),
- Entry.of("Platz 11", 3l))};
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestEntry;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Arrays;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-class TestData
-{
- static final TestUser PETER = TestUser.of("peter");
- static final TestUser KLAUS = TestUser.of("klaus");
-
- static final Stream<KeyValue<TestWord, TestCounter>> getInputMessages()
- {
- return Stream.of(INPUT_MESSAGES);
- }
-
- private static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
- {
- new KeyValue<>(
- TestWord.of(PETER.getUser(),"Hallo"),
- TestCounter.of(PETER.getUser(),"Hallo",1)),
- new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"Müsch"),
- TestCounter.of(KLAUS.getUser(),"Müsch",1)),
- new KeyValue<>(
- TestWord.of(PETER.getUser(),"Welt"),
- TestCounter.of(PETER.getUser(),"Welt",1)),
- new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"Müsch"),
- TestCounter.of(KLAUS.getUser(),"Müsch",2)),
- new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",1)),
- new KeyValue<>(
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",1)),
- new KeyValue<>(
- TestWord.of(PETER.getUser(),"Welt"),
- TestCounter.of(PETER.getUser(),"Welt",2)),
- new KeyValue<>(
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",2)),
- new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",2)),
- new KeyValue<>(
- TestWord.of(PETER.getUser(),"Boäh"),
- TestCounter.of(PETER.getUser(),"Boäh",3)),
- new KeyValue<>(
- TestWord.of(KLAUS.getUser(),"s"),
- TestCounter.of(KLAUS.getUser(),"s",3)),
- };
-
- static void assertExpectedMessages(MultiValueMap<TestUser, TestRanking> receivedMessages)
- {
- expectedMessages().forEach(
- (user, rankings) ->
- assertThat(receivedMessages.get(user))
- .containsExactlyElementsOf(rankings));
- }
-
- static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
- {
- assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER)));
- assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS)));
- }
-
- private static User userOf(TestUser user)
- {
- return User.of(user.getUser());
- }
-
- static void assertExpectedNumberOfMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
- {
- assertThat(countMessagesForUser(PETER, receivedMessages));
- assertThat(countMessagesForUser(KLAUS, receivedMessages));
- }
-
- private static int countMessagesForUser(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
- {
- return messagesForUsers.get(user) == null
- ? 0
- : messagesForUsers.get(user).size();
- }
-
-
- static void assertExpectedLastMessagesForUsers(MultiValueMap<TestUser, TestRanking> receivedMessages)
- {
- assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages));
- assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages));
- }
-
- private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking)
- {
- TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries()));
- assertRankingEqualsRankingFromLastMessage(user, testRanking);
- }
-
- private static TestEntry[] testEntriesOf(Entry... entries)
- {
- return Arrays
- .stream(entries)
- .map(entry -> TestEntry.of(
- entry.getWord(),
- entry.getCounter() == null
- ? -1l
- : entry.getCounter()))
- .toArray(size -> new TestEntry[size]);
- }
-
- private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking)
- {
- assertThat(ranking).isEqualTo(getLastMessageFor(user));
- }
-
- private static TestRanking getLastMessageFor(TestUser user)
- {
- return getLastMessageFor(user, expectedMessages());
- }
-
- private static TestRanking getLastMessageFor(TestUser user, MultiValueMap<TestUser, TestRanking> messagesForUsers)
- {
- return messagesForUsers
- .get(user)
- .stream()
- .reduce(null, (left, right) -> right);
- }
-
- private static KeyValue<TestUser, TestRanking>[] EXPECTED_MESSAGES = new KeyValue[]
- {
- KeyValue.pair( // 0
- PETER,
- TestRanking.of(
- TestEntry.of("Hallo", 1l))),
- KeyValue.pair( // 1
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 1l))),
- KeyValue.pair( // 2
- PETER,
- TestRanking.of(
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Welt", 1l))),
- KeyValue.pair( // 3
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 2l))),
- KeyValue.pair( // 4
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 2l),
- TestEntry.of("s", 1l))),
- KeyValue.pair( // 5
- PETER,
- TestRanking.of(
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Welt", 1l),
- TestEntry.of("Boäh", 1l))),
- KeyValue.pair( // 6
- PETER,
- TestRanking.of(
- TestEntry.of("Welt", 2l),
- TestEntry.of("Hallo", 1l),
- TestEntry.of("Boäh", 1l))),
- KeyValue.pair( // 7
- PETER,
- TestRanking.of(
- TestEntry.of("Welt", 2l),
- TestEntry.of("Boäh", 2l),
- TestEntry.of("Hallo", 1l))),
- KeyValue.pair( // 8
- KLAUS,
- TestRanking.of(
- TestEntry.of("Müsch", 2l),
- TestEntry.of("s", 2l))),
- KeyValue.pair( // 9
- PETER,
- TestRanking.of(
- TestEntry.of("Boäh", 3l),
- TestEntry.of("Welt", 2l),
- TestEntry.of("Hallo", 1l))),
- KeyValue.pair( // 10
- KLAUS,
- TestRanking.of(
- TestEntry.of("s", 3l),
- TestEntry.of("Müsch", 2l))),
- };
-
- private static MultiValueMap<TestUser, TestRanking> expectedMessages()
- {
- MultiValueMap<TestUser, TestRanking> expectedMessages = new LinkedMultiValueMap<>();
- Stream
- .of(EXPECTED_MESSAGES)
- .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
- return expectedMessages;
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.kafka.support.SendResult;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.time.Duration;
-
-import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
-import static org.awaitility.Awaitility.await;
-
-
-@SpringBootTest(
- properties = {
- "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
- "spring.kafka.consumer.auto-offset-reset=earliest",
- "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
- "logging.level.root=WARN",
- "logging.level.de.juplo=DEBUG",
- "logging.level.org.apache.kafka.clients=INFO",
- "logging.level.org.apache.kafka.streams=INFO",
- "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.wordcount.top10.commit-interval=100",
- "juplo.wordcount.top10.cacheMaxBytes=0",
- "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
- "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
-@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
-@Slf4j
-public class Top10ApplicationIT
-{
- public static final String TOPIC_IN = "in";
- public static final String TOPIC_OUT = "out";
-
- @Autowired
- Consumer consumer;
- @Autowired
- Top10StreamProcessor streamProcessor;
-
-
- @BeforeAll
- public static void testSendMessage(
- @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
- {
- TestData
- .getInputMessages()
- .forEach(kv ->
- {
- try
- {
- SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
- log.info(
- "Sent: {}={}, partition={}, offset={}",
- result.getProducerRecord().key(),
- result.getProducerRecord().value(),
- result.getRecordMetadata().partition(),
- result.getRecordMetadata().offset());
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- });
- }
-
- @DisplayName("Await the expected state in the state-store")
- @Test
- public void testAwaitExpectedState()
- {
- await("Expected state")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
- }
-
- @DisplayName("Await the expected output messages")
- @Test
- @Disabled
- public void testAwaitExpectedMessages()
- {
- await("Expected messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
- }
-
- @DisplayName("Await the expected number of messages")
- @Test
- public void testAwaitExpectedNumberOfMessagesForUsers()
- {
- await("Expected number of messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
- }
-
- @DisplayName("Await the expected final output messages")
- @Test
- public void testAwaitExpectedLastMessagesForUsers()
- {
- await("Expected final output messages")
- .atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
- }
-
-
- static class Consumer
- {
- private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
-
- @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
- public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
- @Payload TestRanking ranking)
- {
- log.debug("Received message: {} -> {}", user, ranking);
- received.add(user, ranking);
- }
-
- synchronized void enforceAssertion(
- java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
- {
- assertion.accept(received);
- }
- }
-
- @TestConfiguration
- static class Configuration
- {
- @Bean
- Consumer consumer()
- {
- return new Consumer();
- }
-
- @Primary
- @Bean
- KeyValueBytesStoreSupplier inMemoryStoreSupplier()
- {
- return Stores.inMemoryKeyValueStore(STORE_NAME);
- }
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.top10;
-
-import de.juplo.kafka.wordcount.counter.TestCounter;
-import de.juplo.kafka.wordcount.counter.TestWord;
-import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.streams.TestInputTopic;
-import org.apache.kafka.streams.TestOutputTopic;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.springframework.kafka.support.serializer.JsonDeserializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.LinkedMultiValueMap;
-import org.springframework.util.MultiValueMap;
-
-import java.util.Map;
-
-import static de.juplo.kafka.wordcount.top10.Top10ApplicationConfiguration.serializationConfig;
-
-
-@Slf4j
-public class Top10StreamProcessorTopologyTest
-{
- public static final String IN = "TEST-IN";
- public static final String OUT = "TEST-OUT";
- public static final String STORE_NAME = "TOPOLOGY-TEST";
-
-
- TopologyTestDriver testDriver;
- TestInputTopic<TestWord, TestCounter> in;
- TestOutputTopic<TestUser, TestRanking> out;
-
-
- @BeforeEach
- public void setUp()
- {
- Topology topology = Top10StreamProcessor.buildTopology(
- IN,
- OUT,
- Stores.inMemoryKeyValueStore(STORE_NAME));
-
- testDriver = new TopologyTestDriver(topology, serializationConfig());
-
- in = testDriver.createInputTopic(
- IN,
- jsonSerializer(TestWord.class, true),
- jsonSerializer(TestCounter.class,false));
-
- out = testDriver.createOutputTopic(
- OUT,
- new JsonDeserializer()
- .copyWithType(TestUser.class)
- .ignoreTypeHeaders(),
- new JsonDeserializer()
- .copyWithType(TestRanking.class)
- .ignoreTypeHeaders());
-
- }
-
-
- @Test
- public void test()
- {
- TestData
- .getInputMessages()
- .forEach(kv -> in.pipeInput(kv.key, kv.value));
-
- MultiValueMap<TestUser, TestRanking> receivedMessages = new LinkedMultiValueMap<>();
- out
- .readRecordsToList()
- .forEach(record -> receivedMessages.add(record.key(), record.value()));
-
- TestData.assertExpectedMessages(receivedMessages);
-
- TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages);
- TestData.assertExpectedLastMessagesForUsers(receivedMessages);
-
- KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
- TestData.assertExpectedState(store);
- }
-
- @AfterEach
- public void tearDown()
- {
- testDriver.close();
- }
-
- private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
- {
- JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
- jsonSerializer.configure(
- Map.of(
- JsonSerializer.TYPE_MAPPINGS,
- "word:" + TestWord.class.getName() + "," +
- "counter:" + TestCounter.class.getName()),
- isKey);
- return jsonSerializer;
- }
-}