-FROM openjdk:11-jre-slim
-COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar
+FROM eclipse-temurin:17-jre-alpine
+COPY target/streams-deduplicator-*.jar /opt/app.jar
EXPOSE 8080
CMD ["java", "-jar", "/opt/app.jar"]
<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.3.4.RELEASE</version>
- </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.2.5</version>
+ </parent>
+
<groupId>de.juplo.demo.kafka</groupId>
<artifactId>streams-deduplicator</artifactId>
- <version>1.0-SNAPSHOT</version>
+ <version>1.0.2-SNAPSHOT</version>
<name>Streams-Deduplicator</name>
<description>Deduplicator based on Kafka-Streams</description>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
</plugins>
</build>
package de.juplo.demo.kafka.deduplication;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
@Override
public Iterable<String> transform(String value)
{
- String topic = context.topic();
Integer partition = context.partition();
- long offset = context.offset();
- Headers headers = context.headers();
-
long sequenceNumber = Long.parseLong(value);
Long seen = store.get(partition);
package de.juplo.demo.kafka.deduplication;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.PreDestroy;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.Properties;
builder
.<String, String>stream("input")
.flatTransformValues(
- new ValueTransformerSupplier<String, Iterable<String>>()
- {
- @Override
- public ValueTransformer<String, Iterable<String>> get()
- {
- return new DeduplicationTransformer();
- }
- },
+ () -> new DeduplicationTransformer(),
DeduplicationTransformer.STORE)
.to("output");