1 package de.juplo.demo.kafka.deduplication;
4 import org.apache.kafka.common.serialization.Serdes;
5 import org.apache.kafka.streams.KafkaStreams;
6 import org.apache.kafka.streams.StreamsBuilder;
7 import org.apache.kafka.streams.Topology;
8 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
9 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
10 import org.apache.kafka.streams.state.KeyValueStore;
11 import org.apache.kafka.streams.state.StoreBuilder;
12 import org.apache.kafka.streams.state.Stores;
13 import org.slf4j.Logger;
14 import org.slf4j.LoggerFactory;
15 import org.springframework.stereotype.Component;
17 import javax.annotation.PostConstruct;
18 import javax.annotation.PreDestroy;
19 import java.time.Duration;
20 import java.util.Properties;
24 public class Deduplicator
26 final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
28 public final KafkaStreams streams;
33 Properties properties = new Properties();
34 properties.put("bootstrap.servers", "kafka:9092");
35 properties.put("application.id", "streams-deduplicator");
36 properties.put("default.key.serde", Serdes.StringSerde.class);
37 properties.put("default.value.serde", Serdes.StringSerde.class);
39 streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
40 streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
42 LOG.error("Unexpected error in thread {}: {}", t, e.toString());
45 streams.close(Duration.ofSeconds(5));
49 LOG.error("Could not close KafkaStreams!", ex);
54 static Topology buildTopology()
56 StreamsBuilder builder = new StreamsBuilder();
58 // Create state-store for sequence numbers
59 StoreBuilder<KeyValueStore<Integer,Long>> store =
60 Stores.keyValueStoreBuilder(
61 Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
65 builder.addStateStore(store);
68 .<String, String>stream("input")
70 new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
73 public ValueTransformerWithKey<String, String, Iterable<String>> get()
75 return new DeduplicationTransformer();
78 DeduplicationTransformer.STORE)
81 return builder.build();
93 streams.close(Duration.ofSeconds(5));