Added an endpoint to query the watermarks by partition
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
1 package de.juplo.demo.kafka.deduplication;
2
3
4 import org.apache.kafka.common.serialization.IntegerSerializer;
5 import org.apache.kafka.common.serialization.Serdes;
6 import org.apache.kafka.streams.*;
7 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
8 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
9 import org.apache.kafka.streams.state.*;
10 import org.slf4j.Logger;
11 import org.slf4j.LoggerFactory;
12 import org.springframework.boot.autoconfigure.web.ServerProperties;
13 import org.springframework.http.HttpHeaders;
14 import org.springframework.http.HttpStatus;
15 import org.springframework.http.MediaType;
16 import org.springframework.http.ResponseEntity;
17 import org.springframework.web.bind.annotation.GetMapping;
18 import org.springframework.web.bind.annotation.PathVariable;
19 import org.springframework.web.bind.annotation.RestController;
20
21 import javax.annotation.PostConstruct;
22 import javax.annotation.PreDestroy;
23 import java.time.Duration;
24 import java.util.Properties;
25
26
27 @RestController
28 public class Deduplicator
29 {
30   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
31
32   public final KafkaStreams streams;
33   public final String host;
34   public final int port;
35
36   public Deduplicator(ServerProperties serverProperties)
37   {
38     Properties properties = new Properties();
39     properties.put("bootstrap.servers", "kafka:9092");
40     properties.put("application.id", "streams-deduplicator");
41     properties.put("default.key.serde", Serdes.StringSerde.class);
42     properties.put("default.value.serde", Serdes.StringSerde.class);
43
44     this.host = serverProperties.getAddress().getHostAddress();
45     this.port = serverProperties.getPort();
46     properties.put("application.server", host + ":" + port);
47
48     streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
49     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
50     {
51       LOG.error("Unexpected error in thread {}: {}", t, e.toString());
52       try
53       {
54         streams.close(Duration.ofSeconds(5));
55       }
56       catch (Exception ex)
57       {
58         LOG.error("Could not close KafkaStreams!", ex);
59       }
60     });
61   }
62
63   static Topology buildTopology()
64   {
65     StreamsBuilder builder = new StreamsBuilder();
66
67     // Create state-store for sequence numbers
68     StoreBuilder<KeyValueStore<Integer,Long>> store =
69         Stores.keyValueStoreBuilder(
70             Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
71             Serdes.Integer(),
72             Serdes.Long());
73     // register store
74     builder.addStateStore(store);
75
76     builder
77         .<String, String>stream("input")
78         .flatTransformValues(
79             new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
80             {
81               @Override
82               public ValueTransformerWithKey<String, String, Iterable<String>> get()
83               {
84                 return new DeduplicationTransformer();
85               }
86             },
87             DeduplicationTransformer.STORE)
88         .to("output");
89
90     return builder.build();
91   }
92
93   @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
94   public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
95   {
96     KeyQueryMetadata metadata =
97         streams.queryMetadataForKey(
98             DeduplicationTransformer.STORE,
99             partition, new IntegerSerializer());
100
101     if (metadata.getActiveHost().port() != port ||
102         !metadata.getActiveHost().host().equals(host))
103     {
104       return
105           ResponseEntity
106               .status(HttpStatus.TEMPORARY_REDIRECT)
107               .header(
108                   HttpHeaders.LOCATION,
109                   "http://" +
110                       metadata.getActiveHost().host() +
111                       ":" +
112                       metadata.getActiveHost().port() +
113                       "/watermark/" +
114                       partition)
115               .build();
116     }
117
118     ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
119         StoreQueryParameters.fromNameAndType(
120             DeduplicationTransformer.STORE,
121             QueryableStoreTypes.keyValueStore()));
122
123     Long watermark = store.get(partition);
124     if (watermark == null)
125       return ResponseEntity.notFound().build();
126
127     return ResponseEntity.ok().body(watermark.toString());
128   }
129
130
131   @PostConstruct
132   public void start()
133   {
134     streams.start();
135   }
136
137   @PreDestroy
138   public void stop()
139   {
140     streams.close(Duration.ofSeconds(5));
141   }
142 }