1 package de.juplo.demo.kafka.deduplication;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
8 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
9 import org.apache.kafka.streams.state.*;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.boot.autoconfigure.web.ServerProperties;
13 import org.springframework.http.HttpHeaders;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.ResponseEntity;
17 import org.springframework.web.bind.annotation.GetMapping;
18 import org.springframework.web.bind.annotation.PathVariable;
19 import org.springframework.web.bind.annotation.RestController;
21 import javax.annotation.PostConstruct;
22 import javax.annotation.PreDestroy;
23 import java.time.Duration;
24 import java.util.Properties;
28 public class Deduplicator
30 final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
32 public final KafkaStreams streams;
33 public final String host;
34 public final int port;
36 public Deduplicator(ServerProperties serverProperties)
38 Properties properties = new Properties();
39 properties.put("bootstrap.servers", "kafka:9092");
40 properties.put("application.id", "streams-deduplicator");
41 properties.put("default.key.serde", Serdes.StringSerde.class);
42 properties.put("default.value.serde", Serdes.StringSerde.class);
44 this.host = serverProperties.getAddress().getHostAddress();
45 this.port = serverProperties.getPort();
46 properties.put("application.server", host + ":" + port);
48 streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
49 streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
51 LOG.error("Unexpected error in thread {}: {}", t, e.toString());
54 streams.close(Duration.ofSeconds(5));
58 LOG.error("Could not close KafkaStreams!", ex);
63 static Topology buildTopology()
65 StreamsBuilder builder = new StreamsBuilder();
67 // Create state-store for sequence numbers
68 StoreBuilder<KeyValueStore<Integer,Long>> store =
69 Stores.keyValueStoreBuilder(
70 Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
74 builder.addStateStore(store);
77 .<String, String>stream("input")
79 new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
82 public ValueTransformerWithKey<String, String, Iterable<String>> get()
84 return new DeduplicationTransformer();
87 DeduplicationTransformer.STORE)
90 return builder.build();
93 @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
94 public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
96 KeyQueryMetadata metadata =
97 streams.queryMetadataForKey(
98 DeduplicationTransformer.STORE,
99 partition, new IntegerSerializer());
101 if (metadata.getActiveHost().port() != port ||
102 !metadata.getActiveHost().host().equals(host))
106 .status(HttpStatus.TEMPORARY_REDIRECT)
108 HttpHeaders.LOCATION,
110 metadata.getActiveHost().host() +
112 metadata.getActiveHost().port() +
118 ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
119 StoreQueryParameters.fromNameAndType(
120 DeduplicationTransformer.STORE,
121 QueryableStoreTypes.keyValueStore()));
123 Long watermark = store.get(partition);
124 if (watermark == null)
125 return ResponseEntity.notFound().build();
127 return ResponseEntity.ok().body(watermark.toString());
140 streams.close(Duration.ofSeconds(5));