Added an endpoint to query the watermarks by partition
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
index 8f173f7..5c0f554 100644 (file)
@@ -1,18 +1,22 @@
 package de.juplo.demo.kafka.deduplication;
 
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -20,15 +24,16 @@ import java.time.Duration;
 import java.util.Properties;
 
 
-@Component
+@RestController
 public class Deduplicator
 {
   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
 
   public final KafkaStreams streams;
+  public final String host;
+  public final int port;
 
-
-  public Deduplicator()
+  public Deduplicator(ServerProperties serverProperties)
   {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "kafka:9092");
@@ -36,6 +41,10 @@ public class Deduplicator
     properties.put("default.key.serde", Serdes.StringSerde.class);
     properties.put("default.value.serde", Serdes.StringSerde.class);
 
+    this.host = serverProperties.getAddress().getHostAddress();
+    this.port = serverProperties.getPort();
+    properties.put("application.server", host + ":" + port);
+
     streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
     {
@@ -81,6 +90,44 @@ public class Deduplicator
     return builder.build();
   }
 
+  @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
+  public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
+  {
+    KeyQueryMetadata metadata =
+        streams.queryMetadataForKey(
+            DeduplicationTransformer.STORE,
+            partition, new IntegerSerializer());
+
+    if (metadata.getActiveHost().port() != port ||
+        !metadata.getActiveHost().host().equals(host))
+    {
+      return
+          ResponseEntity
+              .status(HttpStatus.TEMPORARY_REDIRECT)
+              .header(
+                  HttpHeaders.LOCATION,
+                  "http://" +
+                      metadata.getActiveHost().host() +
+                      ":" +
+                      metadata.getActiveHost().port() +
+                      "/watermark/" +
+                      partition)
+              .build();
+    }
+
+    ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
+        StoreQueryParameters.fromNameAndType(
+            DeduplicationTransformer.STORE,
+            QueryableStoreTypes.keyValueStore()));
+
+    Long watermark = store.get(partition);
+    if (watermark == null)
+      return ResponseEntity.notFound().build();
+
+    return ResponseEntity.ok().body(watermark.toString());
+  }
+
+
   @PostConstruct
   public void start()
   {