package de.juplo.demo.kafka.deduplication;
-import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
properties.put("default.key.serde", Serdes.StringSerde.class);
properties.put("default.value.serde", Serdes.StringSerde.class);
- SequenceNumberExtractor<String, String> extractor =
- new SequenceNumberExtractor<String, String>() {
- @Override
- public long extract(
- String topic, int partition, long offset, Headers headers, String key, String value)
- {
- return Long.parseLong(value);
- }
- };
+ streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ }
+ static Topology buildTopology()
+ {
StreamsBuilder builder = new StreamsBuilder();
// Create state-store for sequence numbers
builder
.<String, String>stream("input")
.flatTransformValues(
- new DeduplicationTransformerSupplier<String, String>(extractor),
+ new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+ {
+ @Override
+ public ValueTransformerWithKey<String, String, Iterable<String>> get()
+ {
+ return new DeduplicationTransformer();
+ }
+ },
DeduplicationTransformer.STORE)
.to("output");
- Topology topology = builder.build();
- streams = new KafkaStreams(topology, properties);
- streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
- {
- LOG.error("Unexpected error in thread {}: {}", t, e.toString());
- try
- {
- streams.close(Duration.ofSeconds(5));
- }
- catch (Exception ex)
- {
- LOG.error("Could not close KafkaStreams!", ex);
- }
- });
+ return builder.build();
}
-
@PostConstruct
public void start()
{