WIP
authorKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 05:01:36 +0000 (07:01 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 25 Jun 2024 05:01:36 +0000 (07:01 +0200)
src/main/java/de/juplo/kafka/wordcount/stats/StatsApplicationConfiguration.java

index 215c206..8498a9e 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.stats;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -70,7 +70,7 @@ public class StatsApplicationConfiguration
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, applicationServer.host() + ":" + applicationServer.port());
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
-               props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndFailExceptionHandler.class);
+               props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
 
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, applicationProperties.getBootstrapServer());
                if (applicationProperties.getCommitInterval() != null)