X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=8f173f7ad9b9be419534e77b82d534312bf45ae0;hp=3585604d02d4032b1563edf8e06e59ae5e12cd52;hb=76b0d9606d3d236a98fe660e092c1798c71fe464;hpb=5bb3d2908ccfe091084aa8a0a8b1282fb19fb3e1 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 3585604..8f173f7 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,11 +1,12 @@ package de.juplo.demo.kafka.deduplication; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -35,16 +36,23 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); - SequenceNumberExtractor extractor = - new SequenceNumberExtractor() { - @Override - public long extract( - String topic, int partition, long offset, Headers headers, String key, String value) - { - return Long.parseLong(value); - } - }; + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(Duration.ofSeconds(5)); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + static Topology buildTopology() + { StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -59,27 +67,20 @@ public class Deduplicator builder .stream("input") .flatTransformValues( - new DeduplicationTransformerSupplier(extractor), + new ValueTransformerWithKeySupplier>() + { + @Override + public ValueTransformerWithKey> get() + { + return new DeduplicationTransformer(); + } + }, DeduplicationTransformer.STORE) .to("output"); - Topology topology = builder.build(); - streams = new KafkaStreams(topology, properties); - streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> - { - LOG.error("Unexpected error in thread {}: {}", t, e.toString()); - try - { - streams.close(Duration.ofSeconds(5)); - } - catch (Exception ex) - { - LOG.error("Could not close KafkaStreams!", ex); - } - }); + return builder.build(); } - @PostConstruct public void start() {