WIP
authorKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 05:00:34 +0000 (07:00 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 05:00:34 +0000 (07:00 +0200)
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java

index 9f0699a..215c206 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.stats;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -69,6 +70,7 @@ public class StatsApplicationConfiguration
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
+               props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
 
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
                if (applicationProperties.getCommitInterval() != null)