Upgreaded Spring Boot from `2.3.4.RELEASE` to `3.2.5`
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
index 5c0f554..e1027d0 100644 (file)
@@ -1,11 +1,13 @@
 package de.juplo.demo.kafka.deduplication;
 
 
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.state.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -18,8 +20,6 @@ import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RestController;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.Properties;
 
@@ -33,7 +33,9 @@ public class Deduplicator
   public final String host;
   public final int port;
 
-  public Deduplicator(ServerProperties serverProperties)
+  public Deduplicator(
+      ServerProperties serverProperties,
+      StreamsHealthIndicator healthIndicator)
   {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "kafka:9092");
@@ -58,6 +60,7 @@ public class Deduplicator
         LOG.error("Could not close KafkaStreams!", ex);
       }
     });
+    streams.setStateListener(healthIndicator);
   }
 
   static Topology buildTopology()
@@ -76,10 +79,10 @@ public class Deduplicator
     builder
         .<String, String>stream("input")
         .flatTransformValues(
-            new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+            new ValueTransformerSupplier<String, Iterable<String>>()
             {
               @Override
-              public ValueTransformerWithKey<String, String, Iterable<String>> get()
+              public ValueTransformer<String, Iterable<String>> get()
               {
                 return new DeduplicationTransformer();
               }