stats: 1.0.0 - Foreign stats are ignored during deserialization stats stats-1.0.0
authorKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 04:23:33 +0000 (06:23 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 16:37:16 +0000 (18:37 +0200)
* Added a strongly typed mapping for the allowed known ``StatisticsType``'s
  for incomming messages.
* Unknown types therefore trigger a `DeserializationException`, before
  they can be processed and filtered by the application.
* Configured the `LogAndContinueExceptionHandler` as
  `default.deserialization.exception.handler`, therefore effectively
  ignoring the foreign messages of unknown type.

src/main/java/de/juplo/kafka/wordcount/stats/Key.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/stats/StatsStreamProcessor.java

index ab88ee6..afe1514 100644 (file)
@@ -6,6 +6,6 @@ import lombok.Data;
 @Data
 public class Key
 {
-  private String type;
+  private StatisticsType type;
   private String channel;
 }
index 9f0699a..10bdb73 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.stats;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -85,6 +86,9 @@ public class StatsApplicationConfiguration
        {
                Properties props = new Properties();
 
+               props.put(
+                               StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+                               LogAndContinueExceptionHandler.class);
                props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
                props.put(
index c4557ea..6beb7f0 100644 (file)
@@ -20,7 +20,6 @@ import java.util.Properties;
 @Slf4j
 public class StatsStreamProcessor
 {
-       public static final String STATS_TYPE = "POPULAR";
        public static final String USER_STORE_NAME = "users";
        public static final String RANKING_STORE_NAME = "rankings";
 
@@ -66,7 +65,6 @@ public class StatsStreamProcessor
                                                                .withValueSerde(new JsonSerde().copyWithType(User.class)));
                KStream<String, Ranking> rankings = builder
                                .<Key, Ranking>stream(rankingInputTopic)
-                               .filter((key, value) -> STATS_TYPE.equals(key.getType()))
                                .map((key, value) -> new KeyValue<>(key.getChannel(), value));
 
                rankings