--- /dev/null
+*
+!target/*.jar
--- /dev/null
+.idea
+target
+data.txt
+expected_*
+result_*
--- /dev/null
+FROM openjdk:11-jre-slim
+COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar
+CMD ["java", "-jar", "/opt/app.jar"]
--- /dev/null
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+ docker-compose down
+ mvn clean
+ exit
+fi
+
+mvn package
+
+docker build -t juplo/deduplicator:streams .
+
+docker-compose up -d zookeeper kafka
+
+while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+
+kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input
+kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output
+
+docker-compose up -d deduplicator
+cat data.txt | kafkacat -K: -b localhost:9092 -t input
+sleep 5
+
+kafkacat -C -b localhost:9092 -t input -e | wc -l
+kafkacat -C -b localhost:9092 -t output -e | wc -l
+
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^0 > result_0.txt
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^1 > result_1.txt
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^2 > result_2.txt
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^3 > result_3.txt
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^4 > result_4.txt
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^5 > result_5.txt
+kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^6 > result_6.txt
--- /dev/null
+#!/bin/bash
+
+for i in `seq 1 333`; do echo $(($i%7)):$i; done > data.txt
+for i in `seq 70 578`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 400 1211`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 1000 1111`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 1200 1711`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 1688 3333`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 2567 3500`; do echo $(($i%7)):$i; done >> data.txt
+
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^0 > expected_0.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^1 > expected_1.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^2 > expected_2.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^3 > expected_3.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^4 > expected_4.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^5 > expected_5.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^6 > expected_6.txt
--- /dev/null
+version: "3.2"
+
+services:
+ zookeeper:
+ image: confluentinc/cp-zookeeper:5.3.0
+ ports:
+ - 2181:2181
+ environment:
+ ZOOKEEPER_CLIENT_PORT: 2181
+
+ kafka:
+ image: confluentinc/cp-kafka:5.3.0
+ ports:
+ - 9092:9092
+ environment:
+ KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+ KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+ depends_on:
+ - zookeeper
+
+ deduplicator:
+ image: juplo/deduplicator:streams
+ depends_on:
+ - zookeeper
+ - kafka
+
+networks:
+ default:
+ external:
+ name: trion
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>2.3.4.RELEASE</version>
+ </parent>
+
+ <groupId>de.juplo.demo.kafka</groupId>
+ <artifactId>streams-deduplicator</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <name>Streams-Deduplicator</name>
+ <description>Deduplicator based on Kafka-Streams</description>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-streams</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
--- /dev/null
+package de.juplo.demo.kafka.deduplication;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+
+@Slf4j
+public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
+{
+ final SequenceNumberExtractor<K, V> extractor;
+
+ public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
+ private ProcessorContext context;
+ private KeyValueStore<Integer, Long> store;
+
+
+ public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
+ {
+ this.extractor = extractor;
+ }
+
+
+ @Override
+ public void init(ProcessorContext context)
+ {
+ this.context = context;
+ store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
+ }
+
+ @Override
+ public Iterable<V> transform(K key, V value)
+ {
+ String topic = context.topic();
+ Integer partition = context.partition();
+ long offset = context.offset();
+ Headers headers = context.headers();
+
+ long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
+
+ Long seen = store.get(partition);
+ if (seen == null || seen < sequenceNumber)
+ {
+ store.put(partition, sequenceNumber);
+ return Arrays.asList(value);
+ }
+
+ log.info(
+ "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
+ key,
+ sequenceNumber,
+ seen,
+ offset,
+ partition);
+
+ // Signal, that the message has already been seen.
+ // Downstream has to filter the null-values...
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void close() {}
+}
--- /dev/null
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
+
+public class DeduplicationTransformerSupplier<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>>
+{
+ SequenceNumberExtractor<K, V> extractor;
+
+
+ public DeduplicationTransformerSupplier(SequenceNumberExtractor<K, V> extractor)
+ {
+ this.extractor = extractor;
+ }
+
+
+ @Override
+ public ValueTransformerWithKey<K, V, Iterable<V>> get()
+ {
+ return new DeduplicationTransformer<K, V>(extractor);
+ }
+}
--- /dev/null
+package de.juplo.demo.kafka.deduplication;
+
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Properties;
+
+
+@Component
+public class Deduplicator
+{
+ final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
+
+ public final KafkaStreams streams;
+
+
+ public Deduplicator()
+ {
+ Properties properties = new Properties();
+ properties.put("bootstrap.servers", "kafka:9092");
+ properties.put("application.id", "streams-deduplicator");
+ properties.put("default.key.serde", Serdes.StringSerde.class);
+ properties.put("default.value.serde", Serdes.StringSerde.class);
+
+ SequenceNumberExtractor<String, String> extractor =
+ new SequenceNumberExtractor<String, String>() {
+ @Override
+ public long extract(
+ String topic, int partition, long offset, Headers headers, String key, String value)
+ {
+ return Long.parseLong(value);
+ }
+ };
+
+ StreamsBuilder builder = new StreamsBuilder();
+
+ // Create state-store for sequence numbers
+ StoreBuilder<KeyValueStore<Integer,Long>> store =
+ Stores.keyValueStoreBuilder(
+ Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
+ Serdes.Integer(),
+ Serdes.Long());
+ // register store
+ builder.addStateStore(store);
+
+ builder
+ .<String, String>stream("input")
+ .flatTransformValues(
+ new DeduplicationTransformerSupplier<String, String>(extractor),
+ DeduplicationTransformer.STORE)
+ .to("output");
+
+ Topology topology = builder.build();
+ streams = new KafkaStreams(topology, properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ }
+
+
+ @PostConstruct
+ public void start()
+ {
+ streams.start();
+ }
+
+ @PreDestroy
+ public void stop()
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+}
--- /dev/null
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.common.header.Headers;
+
+
+public interface SequenceNumberExtractor<K,V>
+{
+ /**
+ * Extracts a sequence number from the given value.
+ *
+ * The sequence number must be represented as a {@link Long} value.
+ *
+ * @param topic The topic, the message was issued on
+ * @param partition The partition, the message was written to
+ * @param offset The offset of the message in the partition
+ * @param key The key of the message
+ * @param value The value of the message
+ * @return a unique ID
+ */
+ public long extract(String topic, int partition, long offset, Headers headers, K key, V value);
+}
--- /dev/null
+ package de.juplo.demo.kafka.deduplication;
+
+ import org.springframework.boot.SpringApplication;
+ import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+ @SpringBootApplication
+ public class StreamsDeduplicatorApplication
+ {
+ public static void main(String[] args)
+ {
+ SpringApplication.run(StreamsDeduplicatorApplication.class, args);
+ }
+ }