-FROM openjdk:11-jre-slim
+FROM eclipse-temurin:17-jre
COPY target/*.jar /opt/app.jar
-EXPOSE 8080
+EXPOSE 8084
ENTRYPOINT ["java", "-jar", "/opt/app.jar"]
CMD []
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.5.4</version>
+ <version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>top10</artifactId>
- <version>1.0.0</version>
+ <version>1.2.0</version>
<name>Wordcount-Top-10</name>
<description>Top-10 stream-processor of the multi-user wordcount-example</description>
<properties>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
- <java.version>11</java.version>
- <kafka.version>2.8.0</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
+
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
package de.juplo.kafka.wordcount.top10;
-import lombok.Value;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
-@Value(staticConstructor = "of")
+@NoArgsConstructor
+@AllArgsConstructor(
+ staticName = "of",
+ access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Entry
{
- private final String word;
- private final Long count;
+ private String word;
+ private Long counter;
}
package de.juplo.kafka.wordcount.top10;
-import lombok.Getter;
-import lombok.Setter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
-@Getter
-@Setter
+@NoArgsConstructor
+@AllArgsConstructor(
+ staticName = "of",
+ access = AccessLevel.PACKAGE)
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
public class Key
{
- private String username;
+ private String user;
private String word;
}
package de.juplo.kafka.wordcount.top10;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
-@Getter
-@Setter
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
public class Ranking
{
+ public final static int MAX_ENTRIES = 10;
+
+
private Entry[] entries = new Entry[0];
- public void add(Entry newEntry)
+ public Ranking add(Entry newEntry)
{
if (entries.length == 0)
{
entries = new Entry[1];
entries[0] = newEntry;
- return;
+ return this;
}
List<Entry> list = new LinkedList<>(Arrays.asList(entries));
+ int oldPosition = -1;
for (int i = 0; i < list.size(); i++)
{
- Entry entry;
+ Entry entry = list.get(i);
- entry = list.get(i);
- if (entry.getCount() <= newEntry.getCount())
+ if (entry.getCounter() < newEntry.getCounter())
{
+ if (oldPosition > -1)
+ {
+ if (list.get(oldPosition).getCounter() > newEntry.getCounter())
+ {
+ throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+ }
+ else
+ {
+ // Entry for word already exists with the same counting! Nothing changed...
+ return this;
+ }
+ }
+
list.add(i, newEntry);
for (int j = i+1; j < list.size(); j++)
{
break;
}
}
- if (list.size() > 10)
+ if (list.size() > MAX_ENTRIES)
{
- list = list.subList(0,10);
+ list = list.subList(0, MAX_ENTRIES);
}
entries = list.toArray(num -> new Entry[num]);
- return;
+ return this;
+ }
+
+ if (entry.getWord().equals(newEntry.getWord()))
+ oldPosition = i;
+ }
+
+ if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter())
+ {
+ throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry);
+ }
+
+ if (list.size() < MAX_ENTRIES)
+ {
+ list.add(newEntry);
+ entries = list.toArray(num -> new Entry[num]);
+ }
+
+ return this;
+ }
+
+ public Ranking validate() throws IllegalArgumentException
+ {
+ if (this.entries.length > MAX_ENTRIES)
+ throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES );
+
+ Set<String> seenWords = new HashSet<>();
+ long lowesCounting = Long.MAX_VALUE;
+
+ for (int i=0; i<this.entries.length; i++)
+ {
+ Entry entry = this.entries[i];
+
+ if (seenWords.contains(entry.getWord()))
+ throw new IllegalArgumentException("Invalid Ranking: Multiple occurrences of word -> " + entry.getWord());
+ if (entry.getCounter() > lowesCounting)
+ throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly");
+
+ seenWords.add(entry.getWord());
+ lowesCounting = entry.getCounter();
+ }
+
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ return true;
+ if (o == null)
+ return false;
+ if (!(o instanceof Ranking))
+ return false;
+
+ Ranking other = (Ranking)o;
+
+ if (other.entries.length != entries.length)
+ return false;
+
+ if (entries.length == 0)
+ return true;
+
+ int i = 0;
+ Set<String> myWordsWithCurrentCount = new HashSet<>();
+ Set<String> otherWordsWithCurrentCount = new HashSet<>();
+ Entry myEntry = entries[i];
+ long currentCount = myEntry.getCounter();
+ myWordsWithCurrentCount.add(myEntry.getWord());
+ while (true)
+ {
+ Entry otherEntry = other.entries[i];
+ if (otherEntry.getCounter() != currentCount)
+ return false;
+ otherWordsWithCurrentCount.add(otherEntry.getWord());
+ if (++i >= entries.length)
+ return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount);
+ myEntry = entries[i];
+ if (myEntry.getCounter() != currentCount)
+ {
+ if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount))
+ return false;
+ currentCount = myEntry.getCounter();
+ myWordsWithCurrentCount.clear();
+ otherWordsWithCurrentCount.clear();
}
+ myWordsWithCurrentCount.add(myEntry.getWord());
}
}
+
+ public static Ranking of(Entry... entries)
+ {
+ Ranking ranking = new Ranking(entries);
+ ranking.validate();
+ return ranking;
+ }
}
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
@SpringBootApplication
-@EnableConfigurationProperties(Top10ApplicationProperties.class)
public class Top10Application
{
public static void main(String[] args)
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+
+
+@Configuration
+@EnableConfigurationProperties(Top10ApplicationProperties.class)
+@Slf4j
+public class Top10ApplicationConfiguration
+{
+ @Bean
+ public Properties streamProcessorProperties(Top10ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
+ props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName());
+ props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName());
+ props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName());
+ props.put(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "word:" + Key.class.getName() + "," +
+ "counter:" + Entry.class.getName() + "," +
+ "user:" + User.class.getName() + "," +
+ "ranking:" + Ranking.class.getName());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ if (properties.getCommitInterval() != null)
+ props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval());
+ if (properties.getCacheMaxBytes() != null)
+ props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes());
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ return props;
+ }
+
+ @Bean(initMethod = "start", destroyMethod = "stop")
+ public Top10StreamProcessor streamProcessor(
+ Top10ApplicationProperties applicationProperties,
+ Properties streamProcessorProperties,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ConfigurableApplicationContext context)
+ {
+ Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
+ applicationProperties.getInputTopic(),
+ applicationProperties.getOutputTopic(),
+ streamProcessorProperties,
+ storeSupplier);
+
+ streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
+ {
+ log.error("Unexpected error!", e);
+ CompletableFuture.runAsync(() ->
+ {
+ log.info("Stopping application...");
+ SpringApplication.exit(context, () -> 1);
+ });
+ return SHUTDOWN_CLIENT;
+ });
+
+ return streamProcessor;
+ }
+
+ @Bean
+ public KeyValueBytesStoreSupplier storeSupplier()
+ {
+ return Stores.persistentKeyValueStore("top10");
+ }
+}
private String applicationId = "top10";
private String inputTopic = "countings";
private String outputTopic = "top10";
+ private Integer commitInterval;
+ private Integer cacheMaxBytes;
}
package de.juplo.kafka.wordcount.top10;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.springframework.boot.SpringApplication;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.stereotype.Component;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.regex.Pattern;
-
-import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@Slf4j
-@Component
public class Top10StreamProcessor
{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
public final KafkaStreams streams;
public Top10StreamProcessor(
- Top10ApplicationProperties properties,
- ObjectMapper mapper,
- ConfigurableApplicationContext context)
+ String inputTopic,
+ String outputTopic,
+ Properties props,
+ KeyValueBytesStoreSupplier storeSupplier)
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic,
+ storeSupplier);
+
+ streams = new KafkaStreams(topology, props);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier)
{
StreamsBuilder builder = new StreamsBuilder();
builder
- .<String, String>stream(properties.getInputTopic())
- .map((keyJson, countStr) ->
- {
- try
- {
- Key key = mapper.readValue(keyJson, Key.class);
- Long count = Long.parseLong(countStr);
- Entry entry = Entry.of(key.getWord(), count);
- String entryJson = mapper.writeValueAsString(entry);
- return new KeyValue<>(key.getUsername(), entryJson);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- })
+ .<Key, Entry>stream(inputTopic)
+ .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry))
.groupByKey()
.aggregate(
- () -> "{\"entries\" : []}",
- (username, entryJson, rankingJson) ->
- {
- try
- {
- Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
- ranking.add(mapper.readValue(entryJson, Entry.class));
- return mapper.writeValueAsString(ranking);
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- }
- )
+ () -> new Ranking(),
+ (user, entry, ranking) -> ranking.add(entry),
+ Materialized.as(storeSupplier))
.toStream()
- .to(properties.getOutputTopic());
+ .to(outputTopic);
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return topology;
+ }
- streams = new KafkaStreams(builder.build(), props);
- streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
+ ReadOnlyKeyValueStore<User, Ranking> getStore(String name)
+ {
+ return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore()));
}
- @PostConstruct
public void start()
{
log.info("Starting Stream-Processor");
streams.start();
}
- @PreDestroy
public void stop()
{
log.info("Stopping Stream-Processor");
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class User
+{
+ String user;
+}
-
+server.port=8084
+management.endpoints.web.exposure.include=*
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestCounter
+{
+ String user;
+ String word;
+ long counter;
+
+ public static TestCounter of(TestWord word, long counter)
+ {
+ return new TestCounter(word.getUser(), word.getWord(), counter);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TestWord
+{
+ private String user;
+ private String word;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import de.juplo.kafka.wordcount.top10.Entry;
+import lombok.Data;
+
+
+@Data
+public class RankingData
+{
+ private Entry[] entries;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
+
+
+public class RankingTest
+{
+ @DisplayName("A newly created instance is empty")
+ @Test
+ public void testNewRankingIsEmpty()
+ {
+ Ranking ranking = new Ranking();
+ assertThat(ranking.getEntries()).isEmpty();
+ }
+
+ @DisplayName("An instance that was build from an empty ranking is empty")
+ @Test
+ public void testRankingOfYieldsExpectedResultForEmptyList()
+ {
+ Ranking ranking = new Ranking();
+ assertThat(ranking.getEntries()).isEmpty();
+ }
+
+ @DisplayName("An instance that was build from a valid ranking contains the expected entries")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testRankingOfYieldsExpectedResultsForValidRankings(List<Entry> entryList)
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList);
+ }
+
+ @DisplayName("The builder fails for invalid rankings")
+ @ParameterizedTest
+ @MethodSource("invalidRankingsProvider")
+ public void testRankingOfThrowsExceptionForInvalidRankings(List<Entry> entryList)
+ {
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> Ranking.of(toArray(entryList)));
+ }
+
+ @DisplayName("Adding a new word with highest ranking, pushes all other words down")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testAddingNewWordWithHighestRanking(List<Entry> entryList)
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ Entry newEntry = Entry.of("NEW!", rankingForPosition(-1));
+ ranking.add(newEntry);
+ assertThat(ranking.getEntries()[0]).isEqualTo(newEntry);
+ for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+ {
+ assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+ }
+ }
+
+ @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testAddingNewWordWithExistingRanking(List<Entry> entryList)
+ {
+ for (int position = 0; position < entryList.size(); position++ )
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ Entry newEntry = Entry.of("NEW!", rankingForPosition(position));
+ ranking.add(newEntry);
+ for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++)
+ {
+ if (i < position)
+ {
+ assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+ }
+ if (i == position)
+ {
+ assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i));
+ assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry);
+ }
+ if (i > position)
+ {
+ assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i));
+ }
+ }
+ }
+ }
+
+ @DisplayName("Adding a highest ranking for an existing word shifts it to the first place")
+ @ParameterizedTest
+ @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+ public void testAddingExistingWordWithHighestRanking(int position)
+ {
+ Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+ String word = wordForPosition(position);
+ Entry highestEntry = Entry.of(word, 100l);
+ ranking.add(highestEntry);
+ List<Entry> expectedEntries = Stream
+ .concat(
+ Stream.of(highestEntry),
+ VALID_RANKINGS[0]
+ .stream()
+ .filter(entry -> !entry.getWord().equals(word)))
+ .toList();
+ assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries);
+ }
+
+ @DisplayName("Adding an existing word with unchanged ranking changes nothing")
+ @ParameterizedTest
+ @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 })
+ public void testAddingExistingWordWithUnchangedRanking(int position)
+ {
+ Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0]));
+ Entry unchangedEntry = Entry.of(
+ wordForPosition(position),
+ rankingForPosition(position));
+ ranking.add(unchangedEntry);
+ assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]);
+ }
+
+ @DisplayName("Adding an existing word with a lower ranking fails")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testAddingExistingWordWithLowerRankingFails(List<Entry> entryList)
+ {
+ Ranking ranking = Ranking.of(toArray(entryList));
+ entryList.forEach(entry ->
+ assertThatExceptionOfType(IllegalArgumentException.class)
+ .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1))));
+ }
+
+ @DisplayName("Identical rankings are considered equal")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testIdenticalRankingsAreConsideredEaqual(List<Entry> entryList)
+ {
+ assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList)));
+ }
+
+ @DisplayName("Two empty rankings are considered equal")
+ @Test
+ public void testTwoEmptyRankingsAreConsideredEaqual()
+ {
+ assertThat(Ranking.of()).isEqualTo(Ranking.of());
+ }
+
+ @DisplayName("A changed ranking is not considered equal to its unchanged counter-part")
+ @ParameterizedTest
+ @MethodSource("validRankingsProvider")
+ public void testChangedRankingsDoNotEaqualUnchangedOne(List<Entry> entryList)
+ {
+ Ranking changed = Ranking.of(toArray(entryList));
+ changed.add(Entry.of("devilish", 666l));
+ assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList)));
+ }
+
+ @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ")
+ @Test
+ public void testRankingWithDifferentOrderForSameCountAreEqual()
+ {
+ assertThat(
+ Ranking.of(
+ Entry.of("a1",10l),
+ Entry.of("a2",10l),
+ Entry.of("b1", 9l),
+ Entry.of("b2",9l),
+ Entry.of("c1", 8l),
+ Entry.of("c2", 8l)))
+ .isEqualTo(Ranking.of(
+ Entry.of("a2",10l),
+ Entry.of("a1",10l),
+ Entry.of("b2", 9l),
+ Entry.of("b1",9l),
+ Entry.of("c2", 8l),
+ Entry.of("c1", 8l)));
+ }
+
+
+ Entry[] toArray(List<Entry> entryList)
+ {
+ return entryList.toArray(size -> new Entry[size]);
+ }
+
+ static String wordForPosition(int position)
+ {
+ return Integer.toString(position+1);
+ }
+
+ static long rankingForPosition(int position)
+ {
+ return (long)Ranking.MAX_ENTRIES * 2 - position;
+ }
+
+ static Stream<List<Entry>> validRankingsProvider()
+ {
+ return Stream.of(VALID_RANKINGS);
+ }
+
+ static Stream<List<Entry>> invalidRankingsProvider()
+ {
+ return Stream.of(INVALID_RANKINGS);
+ }
+
+ static String[] WORDS = new String[Ranking.MAX_ENTRIES];
+ static List<Entry>[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES];
+
+ static
+ {
+ for (int i = 0; i < Ranking.MAX_ENTRIES; i++)
+ {
+ List<Entry> ranking = new LinkedList<>();
+ String word = null;
+ for (int position = 0; position <= i; position++)
+ {
+ word = wordForPosition(position);
+ Entry entry = Entry.of(word, rankingForPosition(position));
+ ranking.add(entry);
+ }
+ WORDS[i] = word;
+ VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking;
+ }
+ }
+
+ static List<Entry>[] INVALID_RANKINGS = new List[] {
+ List.of(
+ Entry.of("Platz eins", 1l),
+ Entry.of("Platz zwei", 2l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz eins", 1l)),
+ List.of(
+ Entry.of("Platz eins", 11l),
+ Entry.of("Platz eins", 1l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz eins", 11111111l),
+ Entry.of("Platz zwei", 2222222l),
+ Entry.of("Platz fünf", 555555l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz drei", 33333333l),
+ Entry.of("Platz vier", 4444444l),
+ Entry.of("Platz eins", 111111l),
+ Entry.of("Platz sechs", 66666l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz drei", 33333333l),
+ Entry.of("Platz vier", 4444444l),
+ Entry.of("Platz fünf", 555555l),
+ Entry.of("Platz sechs", 66666l),
+ Entry.of("Platz eins", 1l)),
+ List.of(
+ Entry.of("Platz eins", 1111111111l),
+ Entry.of("Platz zwei", 222222222l),
+ Entry.of("Platz drei", 33333333l),
+ Entry.of("Platz vier", 4444444l),
+ Entry.of("Platz fünf", 555555l),
+ Entry.of("Platz sechs", 66666l),
+ Entry.of("Platz sieben", 7777l),
+ Entry.of("Platz acht", 888l),
+ Entry.of("Platz neun", 99l),
+ Entry.of("Platz 10", 6l),
+ Entry.of("Platz 11", 3l))};
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static final KeyValue<TestWord, TestCounter>[] INPUT_MESSAGES = new KeyValue[]
+ {
+ new KeyValue<>(
+ TestWord.of("peter","Hallo"),
+ TestCounter.of("peter","Hallo",1)),
+ new KeyValue<>(
+ TestWord.of("klaus","Müsch"),
+ TestCounter.of("klaus","Müsch",1)),
+ new KeyValue<>(
+ TestWord.of("peter","Welt"),
+ TestCounter.of("peter","Welt",1)),
+ new KeyValue<>(
+ TestWord.of("klaus","Müsch"),
+ TestCounter.of("klaus","Müsch",2)),
+ new KeyValue<>(
+ TestWord.of("klaus","s"),
+ TestCounter.of("klaus","s",1)),
+ new KeyValue<>(
+ TestWord.of("peter","Boäh"),
+ TestCounter.of("peter","Boäh",1)),
+ new KeyValue<>(
+ TestWord.of("peter","Welt"),
+ TestCounter.of("peter","Welt",2)),
+ new KeyValue<>(
+ TestWord.of("peter","Boäh"),
+ TestCounter.of("peter","Boäh",2)),
+ new KeyValue<>(
+ TestWord.of("klaus","s"),
+ TestCounter.of("klaus","s",2)),
+ new KeyValue<>(
+ TestWord.of("peter","Boäh"),
+ TestCounter.of("peter","Boäh",3)),
+ new KeyValue<>(
+ TestWord.of("klaus","s"),
+ TestCounter.of("klaus","s",3)),
+ };
+
+ static void assertExpectedMessages(MultiValueMap<User, Ranking> receivedMessages)
+ {
+ expectedMessages().forEach(
+ (user, rankings) ->
+ assertThat(receivedMessages.get(user))
+ .containsExactlyElementsOf(rankings));
+ }
+
+ static void assertExpectedState(ReadOnlyKeyValueStore<User, Ranking> store)
+ {
+ assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value);
+ assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value);
+ }
+
+ static KeyValue<User, Ranking>[] EXPECTED_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair( // 0
+ User.of("peter"),
+ Ranking.of(
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 1
+ User.of("klaus"),
+ Ranking.of(
+ Entry.of("Müsch", 1l))),
+ KeyValue.pair( // 2
+ User.of("peter"),
+ Ranking.of(
+ Entry.of("Hallo", 1l),
+ Entry.of("Welt", 1l))),
+ KeyValue.pair( // 3
+ User.of("klaus"),
+ Ranking.of(
+ Entry.of("Müsch", 2l))),
+ KeyValue.pair( // 4
+ User.of("klaus"),
+ Ranking.of(
+ Entry.of("Müsch", 2l),
+ Entry.of("s", 1l))),
+ KeyValue.pair( // 5
+ User.of("peter"),
+ Ranking.of(
+ Entry.of("Hallo", 1l),
+ Entry.of("Welt", 1l),
+ Entry.of("Boäh", 1l))),
+ KeyValue.pair( // 6
+ User.of("peter"),
+ Ranking.of(
+ Entry.of("Welt", 2l),
+ Entry.of("Hallo", 1l),
+ Entry.of("Boäh", 1l))),
+ KeyValue.pair( // 7
+ User.of("peter"),
+ Ranking.of(
+ Entry.of("Welt", 2l),
+ Entry.of("Boäh", 2l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 8
+ User.of("klaus"),
+ Ranking.of(
+ Entry.of("Müsch", 2l),
+ Entry.of("s", 2l))),
+ KeyValue.pair( // 9
+ User.of("peter"),
+ Ranking.of(
+ Entry.of("Boäh", 3l),
+ Entry.of("Welt", 2l),
+ Entry.of("Hallo", 1l))),
+ KeyValue.pair( // 10
+ User.of("klaus"),
+ Ranking.of(
+ Entry.of("s", 3l),
+ Entry.of("Müsch", 2l))),
+ };
+
+ static MultiValueMap<User, Ranking> expectedMessages()
+ {
+ MultiValueMap<User, Ranking> expectedMessages = new LinkedMultiValueMap<>();
+ Stream
+ .of(EXPECTED_MESSAGES)
+ .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value));
+ return expectedMessages;
+ }
+
+ static Map<String, Object> convertToMap(Properties properties)
+ {
+ return properties
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> (String)entry.getKey(),
+ entry -> entry.getValue()
+ ));
+ }
+
+ static String parseHeader(Headers headers, String key)
+ {
+ Header header = headers.lastHeader(key);
+ if (header == null)
+ {
+ return key + "=null";
+ }
+ else
+ {
+ return key + "=" + new String(header.value());
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import de.juplo.kafka.wordcount.counter.TestWord;
+import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.query.RankingData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.time.Duration;
+import java.util.stream.Stream;
+
+import static org.awaitility.Awaitility.await;
+
+
+@SpringBootTest(
+ properties = {
+ "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
+ "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
+ "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
+ "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.RankingData",
+ "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ",
+ "logging.level.root=WARN",
+ "logging.level.de.juplo=DEBUG",
+ "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.wordcount.top10.commit-interval=0",
+ "juplo.wordcount.top10.cacheMaxBytes=0",
+ "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN,
+ "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT })
+@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT })
+@Slf4j
+public class Top10ApplicationIT
+{
+ public final static String TOPIC_IN = "in";
+ public final static String TOPIC_OUT = "out";
+ public final static String STORE_NAME = "TEST-STORE";
+
+
+ @Autowired
+ Consumer consumer;
+ @Autowired
+ Top10StreamProcessor top10StreamProcessor;
+
+
+ @BeforeAll
+ public static void testSendMessage(
+ @Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
+ {
+ Stream
+ .of(TestData.INPUT_MESSAGES)
+ .forEach(kv ->
+ {
+ try
+ {
+ SendResult<TestWord, TestCounter> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ log.info(
+ "Sent: {}={}, partition={}, offset={}",
+ result.getProducerRecord().key(),
+ result.getProducerRecord().value(),
+ result.getRecordMetadata().partition(),
+ result.getRecordMetadata().offset());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expexted state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(top10StreamProcessor.getStore(STORE_NAME)));
+ }
+
+ @DisplayName("Await the expected output messages")
+ @Test
+ @Disabled
+ public void testAwaitExpectedMessages()
+ {
+ await("Expexted messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+ }
+
+
+ static class Consumer
+ {
+ private final MultiValueMap<User, Ranking> received = new LinkedMultiValueMap<>();
+
+ @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
+ public synchronized void receive(
+ @Header(KafkaHeaders.RECEIVED_KEY) User user,
+ @Payload RankingData ranking)
+ {
+ log.debug("Received message: {} -> {}", user, ranking);
+ received.add(user, Ranking.of(ranking.getEntries()));
+ }
+
+ synchronized MultiValueMap<User, Ranking> getReceivedMessages()
+ {
+ return received;
+ }
+ }
+
+ @TestConfiguration
+ static class Configuration
+ {
+ @Bean
+ Consumer consumer()
+ {
+ return new Consumer();
+ }
+
+ @Primary
+ @Bean
+ KeyValueBytesStoreSupplier inMemoryStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TestOutputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
+import org.springframework.kafka.support.serializer.JsonSerde;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.stream.Stream;
+
+import static de.juplo.kafka.wordcount.top10.TestData.convertToMap;
+import static de.juplo.kafka.wordcount.top10.TestData.parseHeader;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
+import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
+
+
+@Slf4j
+public class Top10StreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+ public final static String STORE_NAME = "TOPOLOGY-TEST";
+
+
+ TopologyTestDriver testDriver;
+ TestInputTopic<Key, Entry> in;
+ TestOutputTopic<User, Ranking> out;
+
+
+ @BeforeEach
+ public void setUp()
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore(STORE_NAME));
+
+ Top10ApplicationConfiguration applicationConfiguriation =
+ new Top10ApplicationConfiguration();
+ Properties streamProcessorProperties =
+ applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties());
+ Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
+
+ JsonSerde<?> keySerde = new JsonSerde<>();
+ keySerde.configure(propertyMap, true);
+ JsonSerde<?> valueSerde = new JsonSerde<>();
+ valueSerde.configure(propertyMap, false);
+
+ testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
+
+ in = testDriver.createInputTopic(
+ IN,
+ (JsonSerializer<Key>)keySerde.serializer(),
+ (JsonSerializer<Entry>)valueSerde.serializer());
+
+ out = testDriver.createOutputTopic(
+ OUT,
+ (JsonDeserializer<User>)keySerde.deserializer(),
+ (JsonDeserializer<Ranking>)valueSerde.deserializer());
+
+ }
+
+
+ @Test
+ public void test()
+ {
+ Stream
+ .of(TestData.INPUT_MESSAGES)
+ .forEach(kv -> in.pipeInput(
+ Key.of(kv.key.getUser(), kv.key.getWord()),
+ Entry.of(kv.value.getWord(), kv.value.getCounter())));
+
+ MultiValueMap<User, Ranking> receivedMessages = new LinkedMultiValueMap<>();
+ out
+ .readRecordsToList()
+ .forEach(record ->
+ {
+ log.debug(
+ "OUT: {} -> {}, {}, {}",
+ record.key(),
+ record.value(),
+ parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
+ parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
+ receivedMessages.add(record.key(), record.value());
+ });
+
+ TestData.assertExpectedMessages(receivedMessages);
+
+ KeyValueStore<User, Ranking> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ testDriver.close();
+ }
+}