Prepared for development ov version `1.0.2` master
authorKai Moritz <kai@juplo.de>
Sun, 5 May 2024 05:55:47 +0000 (07:55 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 May 2024 05:55:47 +0000 (07:55 +0200)
Dockerfile
pom.xml
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java

index 5a7b5f9..96e2f0c 100644 (file)
@@ -1,4 +1,4 @@
-FROM openjdk:11-jre-slim
-COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar
+FROM eclipse-temurin:17-jre-alpine
+COPY target/streams-deduplicator-*.jar /opt/app.jar
 EXPOSE 8080
 CMD ["java", "-jar", "/opt/app.jar"]
 EXPOSE 8080
 CMD ["java", "-jar", "/opt/app.jar"]
diff --git a/pom.xml b/pom.xml
index 3c75755..933bbb8 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -1,15 +1,19 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-       <modelVersion>4.0.0</modelVersion>
-       <parent>
-               <groupId>org.springframework.boot</groupId>
-               <artifactId>spring-boot-starter-parent</artifactId>
-               <version>2.3.4.RELEASE</version>
-       </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>3.2.5</version>
+  </parent>
+
   <groupId>de.juplo.demo.kafka</groupId>
   <artifactId>streams-deduplicator</artifactId>
   <groupId>de.juplo.demo.kafka</groupId>
   <artifactId>streams-deduplicator</artifactId>
-  <version>1.0-SNAPSHOT</version>
+  <version>1.0.2-SNAPSHOT</version>
   <name>Streams-Deduplicator</name>
   <description>Deduplicator based on Kafka-Streams</description>
 
   <name>Streams-Deduplicator</name>
   <description>Deduplicator based on Kafka-Streams</description>
 
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-maven-plugin</artifactId>
       </plugin>
-                       <plugin>
-                               <groupId>org.apache.maven.plugins</groupId>
-                               <artifactId>maven-failsafe-plugin</artifactId>
-                       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-failsafe-plugin</artifactId>
+      </plugin>
     </plugins>
   </build>
 
     </plugins>
   </build>
 
index dc888bc..2494672 100644 (file)
@@ -1,7 +1,6 @@
 package de.juplo.demo.kafka.deduplication;
 
 import lombok.extern.slf4j.Slf4j;
 package de.juplo.demo.kafka.deduplication;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -28,11 +27,7 @@ public class DeduplicationTransformer implements ValueTransformer<String, Iterab
   @Override
   public Iterable<String> transform(String value)
   {
   @Override
   public Iterable<String> transform(String value)
   {
-    String topic = context.topic();
     Integer partition = context.partition();
     Integer partition = context.partition();
-    long offset = context.offset();
-    Headers headers = context.headers();
-
     long sequenceNumber = Long.parseLong(value);
 
     Long seen = store.get(partition);
     long sequenceNumber = Long.parseLong(value);
 
     Long seen = store.get(partition);
index 38ec22a..4376f30 100644 (file)
@@ -1,6 +1,8 @@
 package de.juplo.demo.kafka.deduplication;
 
 
 package de.juplo.demo.kafka.deduplication;
 
 
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
@@ -18,8 +20,6 @@ import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RestController;
 
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RestController;
 
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.Properties;
 
 import java.time.Duration;
 import java.util.Properties;
 
@@ -79,14 +79,7 @@ public class Deduplicator
     builder
         .<String, String>stream("input")
         .flatTransformValues(
     builder
         .<String, String>stream("input")
         .flatTransformValues(
-            new ValueTransformerSupplier<String, Iterable<String>>()
-            {
-              @Override
-              public ValueTransformer<String, Iterable<String>> get()
-              {
-                return new DeduplicationTransformer();
-              }
-            },
+            () -> new DeduplicationTransformer(),
             DeduplicationTransformer.STORE)
         .to("output");
 
             DeduplicationTransformer.STORE)
         .to("output");