package de.juplo.demo.kafka.deduplication;
-import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
properties.put("default.key.serde", Serdes.StringSerde.class);
properties.put("default.value.serde", Serdes.StringSerde.class);
- SequenceNumberExtractor<String, String> extractor =
- new SequenceNumberExtractor<String, String>() {
- @Override
- public long extract(
- String topic, int partition, long offset, Headers headers, String key, String value)
- {
- return Long.parseLong(value);
- }
- };
-
StreamsBuilder builder = new StreamsBuilder();
// Create state-store for sequence numbers
builder
.<String, String>stream("input")
.flatTransformValues(
- new DeduplicationTransformerSupplier<String, String>(extractor),
+ new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+ {
+ @Override
+ public ValueTransformerWithKey<String, String, Iterable<String>> get()
+ {
+ return new DeduplicationTransformer();
+ }
+ },
DeduplicationTransformer.STORE)
.to("output");