From 59be0dff127b0208092be233bd44761a23e7cf39 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 11:16:37 +0200 Subject: [PATCH 01/16] top10: 1.0.2 - Updated Spring Boot to `3.2.5` --- Dockerfile | 2 +- pom.xml | 4 ++-- .../de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index 899fd4b..16a12e3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:17-jre COPY target/*.jar /opt/app.jar EXPOSE 8084 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 818687f..b46a871 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ org.springframework.boot spring-boot-starter-parent - 2.5.4 + 3.2.5 de.juplo.kafka.wordcount top10 - 1.0.1 + 1.0.2 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 862913a..2ae3ec5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -2,6 +2,8 @@ package de.juplo.kafka.wordcount.top10; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; @@ -13,8 +15,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; -- 2.20.1 From e92f87e3cb4ae59bb0cadd178222f4424038a922 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 May 2024 23:02:55 +0200 Subject: [PATCH 02/16] top10: 1.0.3 - Separated config in `Top10ApplicationConfiguration` -- COPY --- pom.xml | 2 +- .../top10/Top10ApplicationConfiguration.java | 111 ++++++++++++++++++ 2 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java diff --git a/pom.xml b/pom.xml index b46a871..cb7ec14 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.0.2 + 1.0.3 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java new file mode 100644 index 0000000..cdf268f --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -0,0 +1,111 @@ +package de.juplo.kafka.wordcount.top10; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.springframework.boot.SpringApplication; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.stereotype.Component; + +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.regex.Pattern; + +import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; + + +@Slf4j +@Component +public class Top10ApplicationConfiguration +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + public final KafkaStreams streams; + + + public Top10ApplicationConfiguration( + Top10ApplicationProperties properties, + ObjectMapper mapper, + ConfigurableApplicationContext context) + { + StreamsBuilder builder = new StreamsBuilder(); + + builder + .stream(properties.getInputTopic()) + .map((keyJson, countStr) -> + { + try + { + Key key = mapper.readValue(keyJson, Key.class); + Long count = Long.parseLong(countStr); + Entry entry = Entry.of(key.getWord(), count); + String entryJson = mapper.writeValueAsString(entry); + return new KeyValue<>(key.getUsername(), entryJson); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + }) + .groupByKey() + .aggregate( + () -> "{\"entries\" : []}", + (username, entryJson, rankingJson) -> + { + try + { + Ranking ranking = mapper.readValue(rankingJson, Ranking.class); + ranking.add(mapper.readValue(entryJson, Entry.class)); + return mapper.writeValueAsString(ranking); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } + ) + .toStream() + .to(properties.getOutputTopic()); + + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + streams = new KafkaStreams(builder.build(), props); + streams.setUncaughtExceptionHandler((Throwable e) -> + { + log.error("Unexpected error!", e); + CompletableFuture.runAsync(() -> + { + log.info("Stopping application..."); + SpringApplication.exit(context, () -> 1); + }); + return SHUTDOWN_CLIENT; + }); + } + + @PostConstruct + public void start() + { + log.info("Starting Stream-Processor"); + streams.start(); + } + + @PreDestroy + public void stop() + { + log.info("Stopping Stream-Processor"); + streams.close(); + } +} -- 2.20.1 From e3fc3fc028270a7faef487af51128771459aed20 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 May 2024 23:04:50 +0200 Subject: [PATCH 03/16] top10: 1.0.3 - Separated config in `Top10ApplicationConfiguration` -- ALIGN --- .../wordcount/top10/Top10Application.java | 2 - .../top10/Top10ApplicationConfiguration.java | 99 +++++-------------- .../wordcount/top10/Top10StreamProcessor.java | 43 +------- 3 files changed, 32 insertions(+), 112 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java index 27dca95..5c14ae7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java @@ -2,11 +2,9 @@ package de.juplo.kafka.wordcount.top10; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; @SpringBootApplication -@EnableConfigurationProperties(Top10ApplicationProperties.class) public class Top10Application { public static void main(String[] args) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index cdf268f..3ea85b8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -1,89 +1,56 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.springframework.boot.SpringApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; import java.util.Properties; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; +@Configuration +@EnableConfigurationProperties(Top10ApplicationProperties.class) @Slf4j -@Component public class Top10ApplicationConfiguration { - final static Pattern PATTERN = Pattern.compile("\\W+"); - - public final KafkaStreams streams; - - - public Top10ApplicationConfiguration( - Top10ApplicationProperties properties, - ObjectMapper mapper, - ConfigurableApplicationContext context) + @Bean + public Properties streamProcessorProperties(Top10ApplicationProperties properties) { - StreamsBuilder builder = new StreamsBuilder(); - - builder - .stream(properties.getInputTopic()) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) - .groupByKey() - .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) - .toStream() - .to(properties.getOutputTopic()); - Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> + return props; + } + + @Bean(initMethod = "start", destroyMethod = "stop") + public Top10StreamProcessor streamProcessor( + Top10ApplicationProperties applicationProperties, + ObjectMapper objectMapper, + Properties streamProcessorProperties, + ConfigurableApplicationContext context) + { + Top10StreamProcessor streamProcessor = new Top10StreamProcessor( + applicationProperties.getInputTopic(), + applicationProperties.getOutputTopic(), + objectMapper, + streamProcessorProperties); + + streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { log.error("Unexpected error!", e); CompletableFuture.runAsync(() -> @@ -93,19 +60,7 @@ public class Top10ApplicationConfiguration }); return SHUTDOWN_CLIENT; }); - } - @PostConstruct - public void start() - { - log.info("Starting Stream-Processor"); - streams.start(); - } - - @PreDestroy - public void stop() - { - log.info("Stopping Stream-Processor"); - streams.close(); + return streamProcessor; } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 2ae3ec5..f0a7d19 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -2,44 +2,30 @@ package de.juplo.kafka.wordcount.top10; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.StreamsConfig; -import org.springframework.boot.SpringApplication; -import org.springframework.context.ConfigurableApplicationContext; -import org.springframework.stereotype.Component; import java.util.Properties; -import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; - -import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT; @Slf4j -@Component public class Top10StreamProcessor { - final static Pattern PATTERN = Pattern.compile("\\W+"); - public final KafkaStreams streams; public Top10StreamProcessor( - Top10ApplicationProperties properties, + String inputTopic, + String outputTopic, ObjectMapper mapper, - ConfigurableApplicationContext context) + Properties props) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(properties.getInputTopic()) + .stream(inputTopic) .map((keyJson, countStr) -> { try @@ -73,36 +59,17 @@ public class Top10StreamProcessor } ) .toStream() - .to(properties.getOutputTopic()); - - Properties props = new Properties(); - props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + .to(outputTopic); streams = new KafkaStreams(builder.build(), props); - streams.setUncaughtExceptionHandler((Throwable e) -> - { - log.error("Unexpected error!", e); - CompletableFuture.runAsync(() -> - { - log.info("Stopping application..."); - SpringApplication.exit(context, () -> 1); - }); - return SHUTDOWN_CLIENT; - }); } - @PostConstruct public void start() { log.info("Starting Stream-Processor"); streams.start(); } - @PreDestroy public void stop() { log.info("Stopping Stream-Processor"); -- 2.20.1 From 7e0f4ca6927d9b62909b0789b4b14a46f8dbd0f5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 14:55:17 +0200 Subject: [PATCH 04/16] top10: 1.1.0 - Simplified topology, using JsonSerde - Defined `JsonSerde` as default for keys and values. - Removed the configuration of specific serdes from all steps of the processor-topology. - Added type-mappings for serialization/deserialization. --- pom.xml | 6 +- .../de/juplo/kafka/wordcount/top10/Entry.java | 17 ++++-- .../de/juplo/kafka/wordcount/top10/Key.java | 14 +++-- .../juplo/kafka/wordcount/top10/Ranking.java | 23 +++++--- .../top10/Top10ApplicationConfiguration.java | 23 +++++--- .../wordcount/top10/Top10StreamProcessor.java | 55 +++++++------------ 6 files changed, 78 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index cb7ec14..02cf701 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.0.3 + 1.1.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example @@ -31,6 +31,10 @@ org.apache.kafka kafka-streams + + org.springframework.kafka + spring-kafka + org.springframework.boot diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index 67f45f2..b25fc07 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -1,11 +1,20 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Value; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; -@Value(staticConstructor = "of") +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Entry { - private final String word; - private final Long count; + private String word; + private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index d09dbcc..ffac8ea 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -1,13 +1,17 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; -@Getter -@Setter +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String username; + private String user; private String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index b748fe5..80e8742 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -1,26 +1,26 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import lombok.*; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -@Getter -@Setter +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data public class Ranking { private Entry[] entries = new Entry[0]; - public void add(Entry newEntry) + public Ranking add(Entry newEntry) { if (entries.length == 0) { entries = new Entry[1]; entries[0] = newEntry; - return; + return this; } List list = new LinkedList<>(Arrays.asList(entries)); @@ -29,7 +29,7 @@ public class Ranking Entry entry; entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) + if (entry.getCounter() <= newEntry.getCounter()) { list.add(i, newEntry); for (int j = i+1; j < list.size(); j++) @@ -46,8 +46,15 @@ public class Ranking list = list.subList(0,10); } entries = list.toArray(num -> new Entry[num]); - return; + return this; } } + + return this; + } + + public static Ranking of(Entry... entries) + { + return new Ranking(entries); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 3ea85b8..7749917 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -1,16 +1,16 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; import java.util.Properties; import java.util.concurrent.CompletableFuture; @@ -30,8 +30,19 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId()); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); + props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); + props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName()); + props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); + props.put( + JsonDeserializer.TYPE_MAPPINGS, + "word:" + Key.class.getName() + "," + + "counter:" + Entry.class.getName()); + props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); + props.put( + JsonSerializer.TYPE_MAPPINGS, + "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; @@ -40,14 +51,12 @@ public class Top10ApplicationConfiguration @Bean(initMethod = "start", destroyMethod = "stop") public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, - ObjectMapper objectMapper, Properties streamProcessorProperties, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - objectMapper, streamProcessorProperties); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index f0a7d19..a3900bf 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,11 +1,10 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import java.util.Properties; @@ -19,49 +18,35 @@ public class Top10StreamProcessor public Top10StreamProcessor( String inputTopic, String outputTopic, - ObjectMapper mapper, Properties props) + { + Topology topology = Top10StreamProcessor.buildTopology( + inputTopic, + outputTopic); + + streams = new KafkaStreams(topology, props); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(inputTopic) - .map((keyJson, countStr) -> - { - try - { - Key key = mapper.readValue(keyJson, Key.class); - Long count = Long.parseLong(countStr); - Entry entry = Entry.of(key.getWord(), count); - String entryJson = mapper.writeValueAsString(entry); - return new KeyValue<>(key.getUsername(), entryJson); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - }) + .stream(inputTopic) + .map((key, entry) -> new KeyValue<>(key.getUser(), entry)) .groupByKey() .aggregate( - () -> "{\"entries\" : []}", - (username, entryJson, rankingJson) -> - { - try - { - Ranking ranking = mapper.readValue(rankingJson, Ranking.class); - ranking.add(mapper.readValue(entryJson, Entry.class)); - return mapper.writeValueAsString(ranking); - } - catch (JsonProcessingException e) - { - throw new RuntimeException(e); - } - } - ) + () -> new Ranking(), + (user, entry, ranking) -> ranking.add(entry)) .toStream() .to(outputTopic); - streams = new KafkaStreams(builder.build(), props); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start() -- 2.20.1 From cd68d7ad7f6c63e929c2dc8121a57c0a6830f5e7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 25 May 2024 16:56:31 +0200 Subject: [PATCH 05/16] top10: 1.1.1 - Upgraded Java & Kafka --- pom.xml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 02cf701..6749738 100644 --- a/pom.xml +++ b/pom.xml @@ -10,13 +10,11 @@ de.juplo.kafka.wordcount top10 - 1.1.0 + 1.1.1 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example 0.33.0 - 11 - 2.8.0 -- 2.20.1 From 21b1d0cffa0bfe47d7d78cef4e4dc23588d69a1b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 25 May 2024 16:59:42 +0200 Subject: [PATCH 06/16] top10: 1.1.2 - (RED) Added test, that asserts the expectated processing --- pom.xml | 21 ++- .../kafka/wordcount/counter/TestCounter.java | 21 +++ .../kafka/wordcount/counter/TestWord.java | 17 ++ .../juplo/kafka/wordcount/top10/TestData.java | 159 ++++++++++++++++++ .../Top10StreamProcessorTopologyTest.java | 83 +++++++++ 5 files changed, 300 insertions(+), 1 deletion(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestData.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java diff --git a/pom.xml b/pom.xml index 6749738..86b4290 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.1.1 + 1.1.2 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example @@ -50,15 +50,34 @@ lombok true + org.springframework.boot spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + org.assertj + assertj-core + test + + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java new file mode 100644 index 0000000..d98ae64 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestCounter.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.counter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestCounter +{ + String user; + String word; + long counter; + + public static TestCounter of(TestWord word, long counter) + { + return new TestCounter(word.getUser(), word.getWord(), counter); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java new file mode 100644 index 0000000..8008e12 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestWord.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class TestWord +{ + private String user; + private String word; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java new file mode 100644 index 0000000..73a405e --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -0,0 +1,159 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.counter.TestWord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.KeyValue; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>( + TestWord.of("peter","Hallo"), + TestCounter.of("peter","Hallo",1)), + new KeyValue<>( + TestWord.of("klaus","Müsch"), + TestCounter.of("klaus","Müsch",1)), + new KeyValue<>( + TestWord.of("peter","Welt"), + TestCounter.of("peter","Welt",1)), + new KeyValue<>( + TestWord.of("klaus","Müsch"), + TestCounter.of("klaus","Müsch",2)), + new KeyValue<>( + TestWord.of("klaus","s"), + TestCounter.of("klaus","s",1)), + new KeyValue<>( + TestWord.of("peter","Boäh"), + TestCounter.of("peter","Boäh",1)), + new KeyValue<>( + TestWord.of("peter","Welt"), + TestCounter.of("peter","Welt",2)), + new KeyValue<>( + TestWord.of("peter","Boäh"), + TestCounter.of("peter","Boäh",2)), + new KeyValue<>( + TestWord.of("klaus","s"), + TestCounter.of("klaus","s",2)), + new KeyValue<>( + TestWord.of("peter","Boäh"), + TestCounter.of("peter","Boäh",3)), + new KeyValue<>( + TestWord.of("klaus","s"), + TestCounter.of("klaus","s",3)), + }; + + static void assertExpectedMessages(MultiValueMap receivedMessages) + { + expectedMessages().forEach( + (user, rankings) -> + assertThat(receivedMessages.get(user)) + .containsExactlyElementsOf(rankings)); + } + + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + "peter", + Ranking.of( + Entry.of("Hallo", 1l))), + KeyValue.pair( // 1 + "klaus", + Ranking.of( + Entry.of("Müsch", 1l))), + KeyValue.pair( // 2 + "peter", + Ranking.of( + Entry.of("Hallo", 1l), + Entry.of("Welt", 1l))), + KeyValue.pair( // 3 + "klaus", + Ranking.of( + Entry.of("Müsch", 2l))), + KeyValue.pair( // 4 + "klaus", + Ranking.of( + Entry.of("Müsch", 2l), + Entry.of("s", 1l))), + KeyValue.pair( // 5 + "peter", + Ranking.of( + Entry.of("Hallo", 1l), + Entry.of("Welt", 1l), + Entry.of("Boäh", 1l))), + KeyValue.pair( // 6 + "peter", + Ranking.of( + Entry.of("Welt", 2l), + Entry.of("Hallo", 1l), + Entry.of("Boäh", 1l))), + KeyValue.pair( // 7 + "peter", + Ranking.of( + Entry.of("Welt", 2l), + Entry.of("Boäh", 2l), + Entry.of("Hallo", 1l))), + KeyValue.pair( // 8 + "klaus", + Ranking.of( + Entry.of("Müsch", 2l), + Entry.of("s", 2l))), + KeyValue.pair( // 9 + "peter", + Ranking.of( + Entry.of("Boäh", 3l), + Entry.of("Welt", 2l), + Entry.of("Hallo", 1l))), + KeyValue.pair( // 10 + "klaus", + Ranking.of( + Entry.of("s", 3l), + Entry.of("Müsch", 2l))), + }; + + static MultiValueMap expectedMessages() + { + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + Stream + .of(EXPECTED_MESSAGES) + .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); + return expectedMessages; + } + + static Map convertToMap(Properties properties) + { + return properties + .entrySet() + .stream() + .collect( + Collectors.toMap( + entry -> (String)entry.getKey(), + entry -> entry.getValue() + )); + } + + static String parseHeader(Headers headers, String key) + { + Header header = headers.lastHeader(key); + if (header == null) + { + return key + "=null"; + } + else + { + return key + "=" + new String(header.value()); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java new file mode 100644 index 0000000..8ecf9fa --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -0,0 +1,83 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.TestOutputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.kafka.support.serializer.JsonSerde; +import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.Map; +import java.util.Properties; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.top10.TestData.convertToMap; +import static de.juplo.kafka.wordcount.top10.TestData.parseHeader; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; +import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME; + + +@Slf4j +public class Top10StreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + @Test + public void test() + { + Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); + + Top10ApplicationConfiguration applicationConfiguriation = + new Top10ApplicationConfiguration(); + Properties streamProcessorProperties = + applicationConfiguriation.streamProcessorProperties(new Top10ApplicationProperties()); + Map propertyMap = convertToMap(streamProcessorProperties); + + JsonSerde keySerde = new JsonSerde<>(); + keySerde.configure(propertyMap, true); + JsonSerde valueSerde = new JsonSerde<>(); + valueSerde.configure(propertyMap, false); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties); + + TestInputTopic in = testDriver.createInputTopic( + IN, + (JsonSerializer)keySerde.serializer(), + (JsonSerializer)valueSerde.serializer()); + + TestOutputTopic out = testDriver.createOutputTopic( + OUT, + (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)valueSerde.deserializer()); + + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(kv -> in.pipeInput( + Key.of(kv.key.getUser(), kv.key.getWord()), + Entry.of(kv.value.getWord(), kv.value.getCounter()))); + + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + out + .readRecordsToList() + .forEach(record -> + { + log.debug( + "OUT: {} -> {}, {}, {}", + record.key(), + record.value(), + parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME), + parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME)); + receivedMessages.add(record.key(), record.value()); + }); + + TestData.assertExpectedMessages(receivedMessages); + + testDriver.close(); + } +} -- 2.20.1 From 15b467e749673a3c372899d9fddc5f62517e2d91 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 27 May 2024 22:56:48 +0200 Subject: [PATCH 07/16] top10: 1.1.2 - (RED) `ToplogyTestDriver.close` must always be called * If the `TopologyTestDriver` is _not_ closed, it leaves behind the created RocksDB. * Hence, the test will fail, if it changes state and expects a clean slate in the beginning. * Therefore, the call to `close()` should happen in `@AfterEach`! --- .../Top10StreamProcessorTopologyTest.java | 28 +++++++++++++++---- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 8ecf9fa..86314e5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -5,6 +5,8 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerde; @@ -28,8 +30,14 @@ public class Top10StreamProcessorTopologyTest public final static String IN = "TEST-IN"; public final static String OUT = "TEST-OUT"; - @Test - public void test() + + TopologyTestDriver testDriver; + TestInputTopic in; + TestOutputTopic out; + + + @BeforeEach + public void setUp() { Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); @@ -44,18 +52,24 @@ public class Top10StreamProcessorTopologyTest JsonSerde valueSerde = new JsonSerde<>(); valueSerde.configure(propertyMap, false); - TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties); + testDriver = new TopologyTestDriver(topology, streamProcessorProperties); - TestInputTopic in = testDriver.createInputTopic( + in = testDriver.createInputTopic( IN, (JsonSerializer)keySerde.serializer(), (JsonSerializer)valueSerde.serializer()); - TestOutputTopic out = testDriver.createOutputTopic( + out = testDriver.createOutputTopic( OUT, (JsonDeserializer)keySerde.deserializer(), (JsonDeserializer)valueSerde.deserializer()); + } + + + @Test + public void test() + { Stream .of(TestData.INPUT_MESSAGES) .forEach(kv -> in.pipeInput( @@ -77,7 +91,11 @@ public class Top10StreamProcessorTopologyTest }); TestData.assertExpectedMessages(receivedMessages); + } + @AfterEach + public void tearDown() + { testDriver.close(); } } -- 2.20.1 From 1f33cdc90381b0afa12f643838ec301d680ebf9f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 25 May 2024 10:17:57 +0200 Subject: [PATCH 08/16] top10: 1.1.2 - (RED) Explicitly formulated expectations for `Ranking` --- .../juplo/kafka/wordcount/top10/Ranking.java | 7 +- .../kafka/wordcount/top10/RankingTest.java | 230 ++++++++++++++++++ 2 files changed, 235 insertions(+), 2 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index 80e8742..0635384 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -12,6 +12,9 @@ import java.util.List; @Data public class Ranking { + public final static int MAX_ENTRIES = 10; + + private Entry[] entries = new Entry[0]; public Ranking add(Entry newEntry) @@ -41,9 +44,9 @@ public class Ranking break; } } - if (list.size() > 10) + if (list.size() > MAX_ENTRIES) { - list = list.subList(0,10); + list = list.subList(0, MAX_ENTRIES); } entries = list.toArray(num -> new Entry[num]); return this; diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java new file mode 100644 index 0000000..e92d87c --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java @@ -0,0 +1,230 @@ +package de.juplo.kafka.wordcount.top10; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; + + +public class RankingTest +{ + @DisplayName("A newly created instance is empty") + @Test + public void testNewRankingIsEmpty() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from an empty ranking is empty") + @Test + public void testRankingOfYieldsExpectedResultForEmptyList() + { + Ranking ranking = new Ranking(); + assertThat(ranking.getEntries()).isEmpty(); + } + + @DisplayName("An instance that was build from a valid ranking contains the expected entries") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testRankingOfYieldsExpectedResultsForValidRankings(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + assertThat(ranking.getEntries()).containsExactlyElementsOf(entryList); + } + + @DisplayName("The builder fails for invalid rankings") + @ParameterizedTest + @MethodSource("invalidRankingsProvider") + public void testRankingOfThrowsExceptionForInvalidRankings(List entryList) + { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> Ranking.of(toArray(entryList))); + } + + @DisplayName("Adding a new word with highest ranking, pushes all other words down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithHighestRanking(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(-1)); + ranking.add(newEntry); + assertThat(ranking.getEntries()[0]).isEqualTo(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + + @DisplayName("Adding a new word with an existent ranking, pushes all words with lower ranking down") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingNewWordWithExistingRanking(List entryList) + { + for (int position = 0; position < entryList.size(); position++ ) + { + Ranking ranking = Ranking.of(toArray(entryList)); + Entry newEntry = Entry.of("NEW!", rankingForPosition(position)); + ranking.add(newEntry); + for (int i = 0; i < entryList.size() && i < Ranking.MAX_ENTRIES - 1; i++) + { + if (i < position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + } + if (i == position) + { + assertThat(ranking.getEntries()[i]).isEqualTo(entryList.get(i)); + assertThat(ranking.getEntries()[i + 1]).isEqualTo(newEntry); + } + if (i > position) + { + assertThat(ranking.getEntries()[i + 1]).isEqualTo(entryList.get(i)); + } + } + } + } + + @DisplayName("Adding a highest ranking for an existing word shifts it to the first place") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithHighestRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + String word = wordForPosition(position); + Entry highestEntry = Entry.of(word, 100l); + ranking.add(highestEntry); + List expectedEntries = Stream + .concat( + Stream.of(highestEntry), + VALID_RANKINGS[0] + .stream() + .filter(entry -> !entry.getWord().equals(word))) + .toList(); + assertThat(ranking.getEntries()).containsExactlyElementsOf(expectedEntries); + } + + @DisplayName("Adding an existing word with unchanged ranking changes nothing") + @ParameterizedTest + @ValueSource(ints = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }) + public void testAddingExistingWordWithUnchangedRanking(int position) + { + Ranking ranking = Ranking.of(toArray(VALID_RANKINGS[0])); + Entry unchangedEntry = Entry.of( + wordForPosition(position), + rankingForPosition(position)); + ranking.add(unchangedEntry); + assertThat(ranking.getEntries()).containsExactlyElementsOf(VALID_RANKINGS[0]); + } + + @DisplayName("Adding an existing word with a lower ranking fails") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testAddingExistingWordWithLowerRankingFails(List entryList) + { + Ranking ranking = Ranking.of(toArray(entryList)); + entryList.forEach(entry -> + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); + } + + + Entry[] toArray(List entryList) + { + return entryList.toArray(size -> new Entry[size]); + } + + static String wordForPosition(int position) + { + return Integer.toString(position+1); + } + + static long rankingForPosition(int position) + { + return (long)Ranking.MAX_ENTRIES * 2 - position; + } + + static Stream> validRankingsProvider() + { + return Stream.of(VALID_RANKINGS); + } + + static Stream> invalidRankingsProvider() + { + return Stream.of(INVALID_RANKINGS); + } + + static String[] WORDS = new String[Ranking.MAX_ENTRIES]; + static List[] VALID_RANKINGS = new List[Ranking.MAX_ENTRIES]; + + static + { + for (int i = 0; i < Ranking.MAX_ENTRIES; i++) + { + List ranking = new LinkedList<>(); + String word = null; + for (int position = 0; position <= i; position++) + { + word = wordForPosition(position); + Entry entry = Entry.of(word, rankingForPosition(position)); + ranking.add(entry); + } + WORDS[i] = word; + VALID_RANKINGS[Ranking.MAX_ENTRIES - (i + 1)] = ranking; + } + } + + static List[] INVALID_RANKINGS = new List[] { + List.of( + Entry.of("Platz eins", 1l), + Entry.of("Platz zwei", 2l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 11l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz eins", 11111111l), + Entry.of("Platz zwei", 2222222l), + Entry.of("Platz fünf", 555555l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz eins", 111111l), + Entry.of("Platz sechs", 66666l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz eins", 1l)), + List.of( + Entry.of("Platz eins", 1111111111l), + Entry.of("Platz zwei", 222222222l), + Entry.of("Platz drei", 33333333l), + Entry.of("Platz vier", 4444444l), + Entry.of("Platz fünf", 555555l), + Entry.of("Platz sechs", 66666l), + Entry.of("Platz sieben", 7777l), + Entry.of("Platz acht", 888l), + Entry.of("Platz neun", 99l), + Entry.of("Platz 10", 6l), + Entry.of("Platz 11", 3l))}; +} -- 2.20.1 From 25afa5a7e6a4f42d18d65ee982faa146f9d03375 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 25 May 2024 17:10:39 +0200 Subject: [PATCH 09/16] top10: 1.1.2 - (GREEN) Fixed implementation of `Ranking` --- .../juplo/kafka/wordcount/top10/Ranking.java | 65 +++++++++++++++++-- 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index 0635384..110ee68 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -2,9 +2,7 @@ package de.juplo.kafka.wordcount.top10; import lombok.*; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; +import java.util.*; @AllArgsConstructor(access = AccessLevel.PRIVATE) @@ -27,13 +25,26 @@ public class Ranking } List list = new LinkedList<>(Arrays.asList(entries)); + int oldPosition = -1; for (int i = 0; i < list.size(); i++) { - Entry entry; + Entry entry = list.get(i); - entry = list.get(i); - if (entry.getCounter() <= newEntry.getCounter()) + if (entry.getCounter() < newEntry.getCounter()) { + if (oldPosition > -1) + { + if (list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + else + { + // Entry for word already exists with the same counting! Nothing changed... + return this; + } + } + list.add(i, newEntry); for (int j = i+1; j < list.size(); j++) { @@ -51,6 +62,44 @@ public class Ranking entries = list.toArray(num -> new Entry[num]); return this; } + + if (entry.getWord().equals(newEntry.getWord())) + oldPosition = i; + } + + if (oldPosition > -1 && list.get(oldPosition).getCounter() > newEntry.getCounter()) + { + throw new IllegalArgumentException("The ranking already contains an entry with a higher counting for " + newEntry); + } + + if (list.size() < MAX_ENTRIES) + { + list.add(newEntry); + entries = list.toArray(num -> new Entry[num]); + } + + return this; + } + + public Ranking validate() throws IllegalArgumentException + { + if (this.entries.length > MAX_ENTRIES) + throw new IllegalArgumentException("Invalid Ranking: a valid ranking cannot have more entries than " + MAX_ENTRIES ); + + Set seenWords = new HashSet<>(); + long lowesCounting = Long.MAX_VALUE; + + for (int i=0; i " + entry.getWord()); + if (entry.getCounter() > lowesCounting) + throw new IllegalArgumentException("Invalid Ranking: Entries are not sorted correctly"); + + seenWords.add(entry.getWord()); + lowesCounting = entry.getCounter(); } return this; @@ -58,6 +107,8 @@ public class Ranking public static Ranking of(Entry... entries) { - return new Ranking(entries); + Ranking ranking = new Ranking(entries); + ranking.validate(); + return ranking; } } -- 2.20.1 From 0ba8ce7bc6200db8ad2abe3aa611e6a66dd7ea44 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 28 May 2024 20:29:18 +0200 Subject: [PATCH 10/16] top10: 1.1.3 - (RED) Formulated expectations for `Ranking.equals()` --- pom.xml | 2 +- .../kafka/wordcount/top10/RankingTest.java | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 86b4290..a8abbc8 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.1.2 + 1.1.3 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java index e92d87c..26749e9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/RankingTest.java @@ -137,6 +137,52 @@ public class RankingTest .isThrownBy(() -> ranking.add(Entry.of(entry.getWord(), entry.getCounter() - 1)))); } + @DisplayName("Identical rankings are considered equal") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testIdenticalRankingsAreConsideredEaqual(List entryList) + { + assertThat(Ranking.of(toArray(entryList))).isEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Two empty rankings are considered equal") + @Test + public void testTwoEmptyRankingsAreConsideredEaqual() + { + assertThat(Ranking.of()).isEqualTo(Ranking.of()); + } + + @DisplayName("A changed ranking is not considered equal to its unchanged counter-part") + @ParameterizedTest + @MethodSource("validRankingsProvider") + public void testChangedRankingsDoNotEaqualUnchangedOne(List entryList) + { + Ranking changed = Ranking.of(toArray(entryList)); + changed.add(Entry.of("devilish", 666l)); + assertThat(changed).isNotEqualTo(Ranking.of(toArray(entryList))); + } + + @DisplayName("Rankigs are considered equal, if only the order of words with the same count differ") + @Test + public void testRankingWithDifferentOrderForSameCountAreEqual() + { + assertThat( + Ranking.of( + Entry.of("a1",10l), + Entry.of("a2",10l), + Entry.of("b1", 9l), + Entry.of("b2",9l), + Entry.of("c1", 8l), + Entry.of("c2", 8l))) + .isEqualTo(Ranking.of( + Entry.of("a2",10l), + Entry.of("a1",10l), + Entry.of("b2", 9l), + Entry.of("b1",9l), + Entry.of("c2", 8l), + Entry.of("c1", 8l))); + } + Entry[] toArray(List entryList) { -- 2.20.1 From 129310339f9c310fa98b65639e26af2736807afd Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 28 May 2024 21:03:11 +0200 Subject: [PATCH 11/16] top10: 1.1.3 - (GREEN) Implemented `Ranking.equals()` accordingly --- .../juplo/kafka/wordcount/top10/Ranking.java | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index 110ee68..4f56c18 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -105,6 +105,51 @@ public class Ranking return this; } + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + if (o == null) + return false; + if (!(o instanceof Ranking)) + return false; + + Ranking other = (Ranking)o; + + if (other.entries.length != entries.length) + return false; + + if (entries.length == 0) + return true; + + int i = 0; + Set myWordsWithCurrentCount = new HashSet<>(); + Set otherWordsWithCurrentCount = new HashSet<>(); + Entry myEntry = entries[i]; + long currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.add(myEntry.getWord()); + while (true) + { + Entry otherEntry = other.entries[i]; + if (otherEntry.getCounter() != currentCount) + return false; + otherWordsWithCurrentCount.add(otherEntry.getWord()); + if (++i >= entries.length) + return myWordsWithCurrentCount.equals(otherWordsWithCurrentCount); + myEntry = entries[i]; + if (myEntry.getCounter() != currentCount) + { + if (!myWordsWithCurrentCount.equals(otherWordsWithCurrentCount)) + return false; + currentCount = myEntry.getCounter(); + myWordsWithCurrentCount.clear(); + otherWordsWithCurrentCount.clear(); + } + myWordsWithCurrentCount.add(myEntry.getWord()); + } + } + public static Ranking of(Entry... entries) { Ranking ranking = new Ranking(entries); -- 2.20.1 From f350c2cafd9f0b290a021443cc7f5818974438e9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 28 May 2024 22:59:38 +0200 Subject: [PATCH 12/16] top10: 1.2.0 - Switched the message-key from `String` to `User` --- pom.xml | 2 +- .../top10/Top10ApplicationConfiguration.java | 3 +- .../wordcount/top10/Top10StreamProcessor.java | 2 +- .../de/juplo/kafka/wordcount/top10/User.java | 14 +++++++++ .../juplo/kafka/wordcount/top10/TestData.java | 30 +++++++++---------- .../Top10StreamProcessorTopologyTest.java | 6 ++-- 6 files changed, 36 insertions(+), 21 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/User.java diff --git a/pom.xml b/pom.xml index a8abbc8..fd71ccd 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.1.3 + 1.2.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 7749917..6f18339 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -33,7 +33,7 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, @@ -42,6 +42,7 @@ public class Top10ApplicationConfiguration props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); props.put( JsonSerializer.TYPE_MAPPINGS, + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index a3900bf..d3846d8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -35,7 +35,7 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(key.getUser(), entry)) + .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) .groupByKey() .aggregate( () -> new Ranking(), diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java new file mode 100644 index 0000000..53c258d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/User.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 73a405e..3bb6b54 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -55,7 +55,7 @@ class TestData TestCounter.of("klaus","s",3)), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( (user, rankings) -> @@ -63,69 +63,69 @@ class TestData .containsExactlyElementsOf(rankings)); } - static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 - "peter", + User.of("peter"), Ranking.of( Entry.of("Hallo", 1l))), KeyValue.pair( // 1 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 1l))), KeyValue.pair( // 2 - "peter", + User.of("peter"), Ranking.of( Entry.of("Hallo", 1l), Entry.of("Welt", 1l))), KeyValue.pair( // 3 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 2l))), KeyValue.pair( // 4 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 2l), Entry.of("s", 1l))), KeyValue.pair( // 5 - "peter", + User.of("peter"), Ranking.of( Entry.of("Hallo", 1l), Entry.of("Welt", 1l), Entry.of("Boäh", 1l))), KeyValue.pair( // 6 - "peter", + User.of("peter"), Ranking.of( Entry.of("Welt", 2l), Entry.of("Hallo", 1l), Entry.of("Boäh", 1l))), KeyValue.pair( // 7 - "peter", + User.of("peter"), Ranking.of( Entry.of("Welt", 2l), Entry.of("Boäh", 2l), Entry.of("Hallo", 1l))), KeyValue.pair( // 8 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 2l), Entry.of("s", 2l))), KeyValue.pair( // 9 - "peter", + User.of("peter"), Ranking.of( Entry.of("Boäh", 3l), Entry.of("Welt", 2l), Entry.of("Hallo", 1l))), KeyValue.pair( // 10 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("s", 3l), Entry.of("Müsch", 2l))), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 86314e5..01c1cf6 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -33,7 +33,7 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver; TestInputTopic in; - TestOutputTopic out; + TestOutputTopic out; @BeforeEach @@ -61,7 +61,7 @@ public class Top10StreamProcessorTopologyTest out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)keySerde.deserializer(), (JsonDeserializer)valueSerde.deserializer()); } @@ -76,7 +76,7 @@ public class Top10StreamProcessorTopologyTest Key.of(kv.key.getUser(), kv.key.getWord()), Entry.of(kv.value.getWord(), kv.value.getCounter()))); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> -- 2.20.1 From 1e073f553fc4bde9ec77eef5fa5fafb440b57304 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 27 May 2024 23:32:49 +0200 Subject: [PATCH 13/16] IT:RED --- .../kafka/wordcount/query/RankingData.java | 11 ++ .../wordcount/top10/Top10ApplicationIT.java | 123 ++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/RankingData.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java diff --git a/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java b/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java new file mode 100644 index 0000000..1bbd3ba --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/RankingData.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.query; + +import de.juplo.kafka.wordcount.top10.Entry; +import lombok.Data; + + +@Data +public class RankingData +{ + private Entry[] entries; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java new file mode 100644 index 0000000..9045c74 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -0,0 +1,123 @@ +package de.juplo.kafka.wordcount.top10; + +import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.counter.TestCounter; +import de.juplo.kafka.wordcount.query.RankingData; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + + +@SpringBootTest( + properties = { + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.use.type.headers=false", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.RankingData", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.top10.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.top10.commit-interval=0", + "juplo.wordcount.top10.cacheMaxBytes=0", + "juplo.wordcount.top10.input-topic=" + Top10ApplicationIT.TOPIC_IN, + "juplo.wordcount.top10.output-topic=" + Top10ApplicationIT.TOPIC_OUT }) +@EmbeddedKafka(topics = { Top10ApplicationIT.TOPIC_IN, Top10ApplicationIT.TOPIC_OUT }) +@Slf4j +public class Top10ApplicationIT +{ + public final static String TOPIC_IN = "in"; + public final static String TOPIC_OUT = "out"; + + @Autowired + Consumer consumer; + + + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) + { + Stream + .of(TestData.INPUT_MESSAGES) + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @DisplayName("Await the expected output messages") + @Test + public void testAwaitExpectedMessages() + { + await("Expexted messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) User user, + @Payload RankingData ranking) + { + log.debug("Received message: {} -> {}", user, ranking); + received.add(user, Ranking.of(ranking.getEntries())); + } + + synchronized MultiValueMap getReceivedMessages() + { + return received; + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +} -- 2.20.1 From 270dbdcc153de6a3c1fc25456fa4a81706918341 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 May 2024 09:07:48 +0200 Subject: [PATCH 14/16] FIX:RED --- .../wordcount/top10/Top10ApplicationConfiguration.java | 10 ++++++---- .../wordcount/top10/Top10ApplicationProperties.java | 2 ++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 6f18339..d588de2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -38,13 +38,15 @@ public class Top10ApplicationConfiguration props.put( JsonDeserializer.TYPE_MAPPINGS, "word:" + Key.class.getName() + "," + - "counter:" + Entry.class.getName()); - props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); - props.put( - JsonSerializer.TYPE_MAPPINGS, + "counter:" + Entry.class.getName() + "," + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + if (properties.getCommitInterval() != null) + props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, properties.getCommitInterval()); + if (properties.getCacheMaxBytes() != null) + props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, properties.getCacheMaxBytes()); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return props; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java index 93b78ec..d3bb236 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java @@ -17,4 +17,6 @@ public class Top10ApplicationProperties private String applicationId = "top10"; private String inputTopic = "countings"; private String outputTopic = "top10"; + private Integer commitInterval; + private Integer cacheMaxBytes; } -- 2.20.1 From 1542ecbb98af2a1b77b35c23bd480b4e9fd49df3 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 30 May 2024 11:50:56 +0200 Subject: [PATCH 15/16] WIP:TEST-store --- .../wordcount/top10/Top10StreamProcessor.java | 13 +++++++++-- .../juplo/kafka/wordcount/top10/TestData.java | 7 ++++++ .../wordcount/top10/Top10ApplicationIT.java | 23 +++++++++++++++++++ .../Top10StreamProcessorTopologyTest.java | 11 ++++++++- 4 files changed, 51 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index d3846d8..2ff078c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -5,6 +5,8 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -22,14 +24,16 @@ public class Top10StreamProcessor { Topology topology = Top10StreamProcessor.buildTopology( inputTopic, - outputTopic); + outputTopic, + null); streams = new KafkaStreams(topology, props); } static Topology buildTopology( String inputTopic, - String outputTopic) + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier) { StreamsBuilder builder = new StreamsBuilder(); @@ -49,6 +53,11 @@ public class Top10StreamProcessor return topology; } + ReadOnlyKeyValueStore getStore(String name) + { + return null; + } + public void start() { log.info("Starting Stream-Processor"); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 3bb6b54..f6d7ccd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -5,6 +5,7 @@ import de.juplo.kafka.wordcount.counter.TestWord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -63,6 +64,12 @@ class TestData .containsExactlyElementsOf(rankings)); } + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertThat(store.get(EXPECTED_MESSAGES[9].key)).isEqualTo(EXPECTED_MESSAGES[9].value); + assertThat(store.get(EXPECTED_MESSAGES[10].key)).isEqualTo(EXPECTED_MESSAGES[10].value); + } + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 9045c74..9f74e25 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -4,6 +4,8 @@ import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.query.RankingData; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -11,6 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; @@ -52,9 +55,13 @@ public class Top10ApplicationIT { public final static String TOPIC_IN = "in"; public final static String TOPIC_OUT = "out"; + public final static String STORE_NAME = "TEST-STORE"; + @Autowired Consumer consumer; + @Autowired + Top10StreamProcessor top10StreamProcessor; @BeforeAll @@ -82,6 +89,15 @@ public class Top10ApplicationIT }); } + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expexted state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(top10StreamProcessor.getStore(STORE_NAME))); + } + @DisplayName("Await the expected output messages") @Test public void testAwaitExpectedMessages() @@ -119,5 +135,12 @@ public class Top10ApplicationIT { return new Consumer(); } + + @Primary + @Bean + KeyValueBytesStoreSupplier inMemoryStoreSupplier() + { + return Stores.inMemoryKeyValueStore(STORE_NAME); + } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 01c1cf6..3feeea3 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -5,6 +5,8 @@ import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,6 +31,7 @@ public class Top10StreamProcessorTopologyTest { public final static String IN = "TEST-IN"; public final static String OUT = "TEST-OUT"; + public final static String STORE_NAME = "TOPOLOGY-TEST"; TopologyTestDriver testDriver; @@ -39,7 +42,10 @@ public class Top10StreamProcessorTopologyTest @BeforeEach public void setUp() { - Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); + Topology topology = Top10StreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore(STORE_NAME)); Top10ApplicationConfiguration applicationConfiguriation = new Top10ApplicationConfiguration(); @@ -91,6 +97,9 @@ public class Top10StreamProcessorTopologyTest }); TestData.assertExpectedMessages(receivedMessages); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); } @AfterEach -- 2.20.1 From ee64b96e15662a6b42ebba0f8ac2c801763f96a0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 30 May 2024 11:01:27 +0200 Subject: [PATCH 16/16] TEST-STORE:GREEN --- .../top10/Top10ApplicationConfiguration.java | 12 +++++++++++- .../wordcount/top10/Top10StreamProcessor.java | 17 +++++++++-------- .../wordcount/top10/Top10ApplicationIT.java | 5 ++--- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index d588de2..03497e4 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -3,6 +3,8 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.Stores; import org.springframework.boot.SpringApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.ConfigurableApplicationContext; @@ -55,12 +57,14 @@ public class Top10ApplicationConfiguration public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, Properties streamProcessorProperties, + KeyValueBytesStoreSupplier storeSupplier, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - streamProcessorProperties); + streamProcessorProperties, + storeSupplier); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> { @@ -75,4 +79,10 @@ public class Top10ApplicationConfiguration return streamProcessor; } + + @Bean + public KeyValueBytesStoreSupplier storeSupplier() + { + return Stores.persistentKeyValueStore("top10"); + } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 2ff078c..343ab4d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,11 +1,10 @@ package de.juplo.kafka.wordcount.top10; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; +import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import java.util.Properties; @@ -20,12 +19,13 @@ public class Top10StreamProcessor public Top10StreamProcessor( String inputTopic, String outputTopic, - Properties props) + Properties props, + KeyValueBytesStoreSupplier storeSupplier) { Topology topology = Top10StreamProcessor.buildTopology( inputTopic, outputTopic, - null); + storeSupplier); streams = new KafkaStreams(topology, props); } @@ -43,7 +43,8 @@ public class Top10StreamProcessor .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry)) + (user, entry, ranking) -> ranking.add(entry), + Materialized.as(storeSupplier)) .toStream() .to(outputTopic); @@ -55,7 +56,7 @@ public class Top10StreamProcessor ReadOnlyKeyValueStore getStore(String name) { - return null; + return streams.store(StoreQueryParameters.fromNameAndType(name, QueryableStoreTypes.keyValueStore())); } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 9f74e25..6707acc 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -6,9 +6,7 @@ import de.juplo.kafka.wordcount.query.RankingData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; @@ -100,6 +98,7 @@ public class Top10ApplicationIT @DisplayName("Await the expected output messages") @Test + @Disabled public void testAwaitExpectedMessages() { await("Expexted messages") -- 2.20.1