Added an endpoint to query the watermarks by partition
authorKai Moritz <kai@juplo.de>
Fri, 9 Oct 2020 20:24:56 +0000 (22:24 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Oct 2020 19:33:54 +0000 (21:33 +0200)
docker-compose.yml
pom.xml
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
src/main/resources/application.yml [new file with mode: 0644]

index 17aff95..2b51357 100644 (file)
@@ -22,6 +22,10 @@ services:
 
   deduplicator:
     image: juplo/deduplicator:streams
+    ports:
+      - 8080:8080
+    environment:
+      server.address: deduplicator
     depends_on:
       - zookeeper
       - kafka
diff --git a/pom.xml b/pom.xml
index 210b8aa..126fb8a 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -17,7 +17,7 @@
 
     <dependency>
       <groupId>org.springframework.boot</groupId>
-      <artifactId>spring-boot-starter</artifactId>
+      <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
index 8f173f7..5c0f554 100644 (file)
@@ -1,18 +1,22 @@
 package de.juplo.demo.kafka.deduplication;
 
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
+import org.springframework.boot.autoconfigure.web.ServerProperties;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -20,15 +24,16 @@ import java.time.Duration;
 import java.util.Properties;
 
 
-@Component
+@RestController
 public class Deduplicator
 {
   final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
 
   public final KafkaStreams streams;
+  public final String host;
+  public final int port;
 
-
-  public Deduplicator()
+  public Deduplicator(ServerProperties serverProperties)
   {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "kafka:9092");
@@ -36,6 +41,10 @@ public class Deduplicator
     properties.put("default.key.serde", Serdes.StringSerde.class);
     properties.put("default.value.serde", Serdes.StringSerde.class);
 
+    this.host = serverProperties.getAddress().getHostAddress();
+    this.port = serverProperties.getPort();
+    properties.put("application.server", host + ":" + port);
+
     streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
     streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
     {
@@ -81,6 +90,44 @@ public class Deduplicator
     return builder.build();
   }
 
+  @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE)
+  public ResponseEntity<String> getWatermarks(@PathVariable Integer partition)
+  {
+    KeyQueryMetadata metadata =
+        streams.queryMetadataForKey(
+            DeduplicationTransformer.STORE,
+            partition, new IntegerSerializer());
+
+    if (metadata.getActiveHost().port() != port ||
+        !metadata.getActiveHost().host().equals(host))
+    {
+      return
+          ResponseEntity
+              .status(HttpStatus.TEMPORARY_REDIRECT)
+              .header(
+                  HttpHeaders.LOCATION,
+                  "http://" +
+                      metadata.getActiveHost().host() +
+                      ":" +
+                      metadata.getActiveHost().port() +
+                      "/watermark/" +
+                      partition)
+              .build();
+    }
+
+    ReadOnlyKeyValueStore<Integer, Long> store = streams.store(
+        StoreQueryParameters.fromNameAndType(
+            DeduplicationTransformer.STORE,
+            QueryableStoreTypes.keyValueStore()));
+
+    Long watermark = store.get(partition);
+    if (watermark == null)
+      return ResponseEntity.notFound().build();
+
+    return ResponseEntity.ok().body(watermark.toString());
+  }
+
+
   @PostConstruct
   public void start()
   {
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
new file mode 100644 (file)
index 0000000..7ef7e54
--- /dev/null
@@ -0,0 +1,3 @@
+server:
+  address: 127.0.0.1
+  port: 8080