package de.juplo.demo.kafka.deduplication;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Properties;
public final String host;
public final int port;
- public Deduplicator(ServerProperties serverProperties)
+ public Deduplicator(
+ ServerProperties serverProperties,
+ StreamsHealthIndicator healthIndicator)
{
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka:9092");
LOG.error("Could not close KafkaStreams!", ex);
}
});
+ streams.setStateListener(healthIndicator);
}
static Topology buildTopology()
builder
.<String, String>stream("input")
.flatTransformValues(
- new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
- {
- @Override
- public ValueTransformerWithKey<String, String, Iterable<String>> get()
- {
- return new DeduplicationTransformer();
- }
- },
+ () -> new DeduplicationTransformer(),
DeduplicationTransformer.STORE)
.to("output");