import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
{
Properties props = new Properties();
+ props.put(
+ StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
+ LogAndContinueExceptionHandler.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(
@Slf4j
public class StatsStreamProcessor
{
- public static final String STATS_TYPE = "POPULAR";
public static final String USER_STORE_NAME = "users";
public static final String RANKING_STORE_NAME = "rankings";
.withValueSerde(new JsonSerde().copyWithType(User.class)));
KStream<String, Ranking> rankings = builder
.<Key, Ranking>stream(rankingInputTopic)
- .filter((key, value) -> STATS_TYPE.equals(key.getType()))
.map((key, value) -> new KeyValue<>(key.getChannel(), value));
rankings