1 package de.juplo.demo.kafka.deduplication;
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
8 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
9 import org.apache.kafka.streams.state.*;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.boot.autoconfigure.web.ServerProperties;
13 import org.springframework.http.HttpHeaders;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.ResponseEntity;
17 import org.springframework.web.bind.annotation.GetMapping;
18 import org.springframework.web.bind.annotation.PathVariable;
19 import org.springframework.web.bind.annotation.RestController;
21 import javax.annotation.PostConstruct;
22 import javax.annotation.PreDestroy;
23 import java.time.Duration;
24 import java.util.Properties;
28 public class Deduplicator
30 final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
32 public final KafkaStreams streams;
33 public final String host;
34 public final int port;
37 ServerProperties serverProperties,
38 StreamsHealthIndicator healthIndicator)
40 Properties properties = new Properties();
41 properties.put("bootstrap.servers", "kafka:9092");
42 properties.put("application.id", "streams-deduplicator");
43 properties.put("default.key.serde", Serdes.StringSerde.class);
44 properties.put("default.value.serde", Serdes.StringSerde.class);
46 this.host = serverProperties.getAddress().getHostAddress();
47 this.port = serverProperties.getPort();
48 properties.put("application.server", host + ":" + port);
50 streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
51 streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
53 LOG.error("Unexpected error in thread {}: {}", t, e.toString());
56 streams.close(Duration.ofSeconds(5));
60 LOG.error("Could not close KafkaStreams!", ex);
63 streams.setStateListener(healthIndicator);
66 static Topology buildTopology()
68 StreamsBuilder builder = new StreamsBuilder();
70 // Create state-store for sequence numbers
71 StoreBuilder<KeyValueStore<Integer,Long>> store =
72 Stores.keyValueStoreBuilder(
73 Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
77 builder.addStateStore(store);
80 .<String, String>stream("input")
82 new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
85 public ValueTransformerWithKey<String, String, Iterable<String>> get()
87 return new DeduplicationTransformer();
90 DeduplicationTransformer.STORE)
93 return builder.build();
96 @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
97 public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
99 KeyQueryMetadata metadata =
100 streams.queryMetadataForKey(
101 DeduplicationTransformer.STORE,
102 partition, new IntegerSerializer());
104 if (metadata.getActiveHost().port() != port ||
105 !metadata.getActiveHost().host().equals(host))
109 .status(HttpStatus.TEMPORARY_REDIRECT)
111 HttpHeaders.LOCATION,
113 metadata.getActiveHost().host() +
115 metadata.getActiveHost().port() +
121 ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
122 StoreQueryParameters.fromNameAndType(
123 DeduplicationTransformer.STORE,
124 QueryableStoreTypes.keyValueStore()));
126 Long watermark = store.get(partition);
127 if (watermark == null)
128 return ResponseEntity.notFound().build();
130 return ResponseEntity.ok().body(watermark.toString());
143 streams.close(Duration.ofSeconds(5));