X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=38ec22abc0e151f05405c278e99a9e9311e3c2af;hb=HEAD;hp=5c0f554a228cc5aac7d5e8e09a312dbf9b1c2c7b;hpb=85905a556b97077ef99332928eb1401864af24e5;p=demos%2Fkafka%2Fdeduplication diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 5c0f554..4376f30 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,11 +1,13 @@ package de.juplo.demo.kafka.deduplication; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; -import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,8 +20,6 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Properties; @@ -33,7 +33,9 @@ public class Deduplicator public final String host; public final int port; - public Deduplicator(ServerProperties serverProperties) + public Deduplicator( + ServerProperties serverProperties, + StreamsHealthIndicator healthIndicator) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -58,6 +60,7 @@ public class Deduplicator LOG.error("Could not close KafkaStreams!", ex); } }); + streams.setStateListener(healthIndicator); } static Topology buildTopology() @@ -76,14 +79,7 @@ public class Deduplicator builder .stream("input") .flatTransformValues( - new ValueTransformerWithKeySupplier>() - { - @Override - public ValueTransformerWithKey> get() - { - return new DeduplicationTransformer(); - } - }, + () -> new DeduplicationTransformer(), DeduplicationTransformer.STORE) .to("output");