From 14a7c9bb8172f3a05fe5cf4f832e13e3ea23dde8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 25 Jun 2024 06:23:33 +0200 Subject: [PATCH] stats: 1.0.0 - Foreign stats are ignored during deserialization * Added a strongly typed mapping for the allowed known ``StatisticsType``'s for incomming messages. * Unknown types therefore trigger a `DeserializationException`, before they can be processed and filtered by the application. * Configured the `LogAndContinueExceptionHandler` as `default.deserialization.exception.handler`, therefore effectively ignoring the foreign messages of unknown type. --- src/main/java/de/juplo/kafka/wordcount/stats/Key.java | 2 +- .../kafka/wordcount/stats/StatsApplicationConfiguration.java | 4 ++++ .../de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java | 2 -- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java index ab88ee6..afe1514 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/Key.java @@ -6,6 +6,6 @@ import lombok.Data; @Data public class Key { - private String type; + private StatisticsType type; private String channel; } diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java index 9f0699a..10bdb73 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.stats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -85,6 +86,9 @@ public class StatsApplicationConfiguration { Properties props = new Properties(); + props.put( + StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + LogAndContinueExceptionHandler.class); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put( diff --git a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java index c4557ea..6beb7f0 100644 --- a/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java @@ -20,7 +20,6 @@ import java.util.Properties; @Slf4j public class StatsStreamProcessor { - public static final String STATS_TYPE = "POPULAR"; public static final String USER_STORE_NAME = "users"; public static final String RANKING_STORE_NAME = "rankings"; @@ -66,7 +65,6 @@ public class StatsStreamProcessor .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder .stream(rankingInputTopic) - .filter((key, value) -> STATS_TYPE.equals(key.getType())) .map((key, value) -> new KeyValue<>(key.getChannel(), value)); rankings -- 2.20.1