1 package de.juplo.demo.kafka.deduplication;
4 import org.apache.kafka.common.header.Headers;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.KafkaStreams;
7 import org.apache.kafka.streams.StreamsBuilder;
8 import org.apache.kafka.streams.Topology;
9 import org.apache.kafka.streams.state.KeyValueStore;
10 import org.apache.kafka.streams.state.StoreBuilder;
11 import org.apache.kafka.streams.state.Stores;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.springframework.stereotype.Component;
16 import javax.annotation.PostConstruct;
17 import javax.annotation.PreDestroy;
18 import java.time.Duration;
19 import java.util.Properties;
23 public class Deduplicator
25 final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
27 public final KafkaStreams streams;
32 Properties properties = new Properties();
33 properties.put("bootstrap.servers", "kafka:9092");
34 properties.put("application.id", "streams-deduplicator");
35 properties.put("default.key.serde", Serdes.StringSerde.class);
36 properties.put("default.value.serde", Serdes.StringSerde.class);
38 SequenceNumberExtractor<String, String> extractor =
39 new SequenceNumberExtractor<String, String>() {
42 String topic, int partition, long offset, Headers headers, String key, String value)
44 return Long.parseLong(value);
48 StreamsBuilder builder = new StreamsBuilder();
50 // Create state-store for sequence numbers
51 StoreBuilder<KeyValueStore<Integer,Long>> store =
52 Stores.keyValueStoreBuilder(
53 Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
57 builder.addStateStore(store);
60 .<String, String>stream("input")
62 new DeduplicationTransformerSupplier<String, String>(extractor),
63 DeduplicationTransformer.STORE)
66 Topology topology = builder.build();
67 streams = new KafkaStreams(topology, properties);
68 streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
70 LOG.error("Unexpected error in thread {}: {}", t, e.toString());
73 streams.close(Duration.ofSeconds(5));
77 LOG.error("Could not close KafkaStreams!", ex);
92 streams.close(Duration.ofSeconds(5));