A deduplicator, that assumes steadily increasing global sequnece numbers
authorKai Moritz <kai@juplo.de>
Thu, 8 Oct 2020 16:44:06 +0000 (18:44 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 May 2024 05:47:30 +0000 (07:47 +0200)
12 files changed:
.dockerignore [new file with mode: 0644]
.gitignore [new file with mode: 0644]
Dockerfile [new file with mode: 0644]
README.sh [new file with mode: 0755]
create-data.sh [new file with mode: 0755]
docker-compose.yml [new file with mode: 0644]
pom.xml [new file with mode: 0644]
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java [new file with mode: 0644]
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java [new file with mode: 0644]
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java [new file with mode: 0644]
src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java [new file with mode: 0644]
src/main/java/de/juplo/demo/kafka/deduplication/StreamsDeduplicatorApplication.java [new file with mode: 0644]

diff --git a/.dockerignore b/.dockerignore
new file mode 100644 (file)
index 0000000..1ad9963
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!target/*.jar
diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..6bcc896
--- /dev/null
@@ -0,0 +1,5 @@
+.idea
+target
+data.txt
+expected_*
+result_*
diff --git a/Dockerfile b/Dockerfile
new file mode 100644 (file)
index 0000000..bd8b1e3
--- /dev/null
@@ -0,0 +1,3 @@
+FROM openjdk:11-jre-slim
+COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar
+CMD ["java", "-jar", "/opt/app.jar"]
diff --git a/README.sh b/README.sh
new file mode 100755 (executable)
index 0000000..6646d24
--- /dev/null
+++ b/README.sh
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+if [ "$1" = "cleanup" ]
+then
+  docker-compose down
+  mvn clean
+  exit
+fi
+
+mvn package
+
+docker build -t juplo/deduplicator:streams .
+
+docker-compose up -d zookeeper kafka
+
+while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+
+kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input
+kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output
+
+docker-compose up -d deduplicator
+cat data.txt | kafkacat -K: -b localhost:9092 -t input
+sleep 5
+
+kafkacat -C -b localhost:9092 -t input -e | wc -l
+kafkacat -C -b localhost:9092 -t output -e | wc -l
+
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^0 > result_0.txt
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^1 > result_1.txt
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^2 > result_2.txt
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^3 > result_3.txt
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^4 > result_4.txt
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^5 > result_5.txt
+kafkacat -C -b localhost:9092 -t output -e  -f'%k:%s\n' | grep ^6 > result_6.txt
diff --git a/create-data.sh b/create-data.sh
new file mode 100755 (executable)
index 0000000..d64ca41
--- /dev/null
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+for i in `seq 1 333`;     do echo $(($i%7)):$i; done >  data.txt
+for i in `seq 70 578`;    do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 400 1211`;  do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 1000 1111`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 1200 1711`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 1688 3333`; do echo $(($i%7)):$i; done >> data.txt
+for i in `seq 2567 3500`; do echo $(($i%7)):$i; done >> data.txt
+
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^0 > expected_0.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^1 > expected_1.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^2 > expected_2.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^3 > expected_3.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^4 > expected_4.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^5 > expected_5.txt
+for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^6 > expected_6.txt
diff --git a/docker-compose.yml b/docker-compose.yml
new file mode 100644 (file)
index 0000000..17aff95
--- /dev/null
@@ -0,0 +1,32 @@
+version: "3.2"
+
+services:
+  zookeeper:
+    image: confluentinc/cp-zookeeper:5.3.0
+    ports:
+      - 2181:2181
+    environment:
+      ZOOKEEPER_CLIENT_PORT: 2181
+
+  kafka:
+    image: confluentinc/cp-kafka:5.3.0
+    ports:
+      - 9092:9092
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
+      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
+    depends_on:
+      - zookeeper
+
+  deduplicator:
+    image: juplo/deduplicator:streams
+    depends_on:
+      - zookeeper
+      - kafka
+
+networks:
+  default:
+    external:
+      name: trion
diff --git a/pom.xml b/pom.xml
new file mode 100644 (file)
index 0000000..97c8f2d
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,52 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.springframework.boot</groupId>
+    <artifactId>spring-boot-starter-parent</artifactId>
+    <version>2.3.4.RELEASE</version>
+  </parent>
+
+  <groupId>de.juplo.demo.kafka</groupId>
+  <artifactId>streams-deduplicator</artifactId>
+  <version>1.0-SNAPSHOT</version>
+  <name>Streams-Deduplicator</name>
+  <description>Deduplicator based on Kafka-Streams</description>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-streams</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.projectlombok</groupId>
+      <artifactId>lombok</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
new file mode 100644 (file)
index 0000000..e07fb37
--- /dev/null
@@ -0,0 +1,68 @@
+package de.juplo.demo.kafka.deduplication;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+
+@Slf4j
+public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
+{
+  final SequenceNumberExtractor<K, V> extractor;
+
+  public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
+  private ProcessorContext context;
+  private KeyValueStore<Integer, Long> store;
+
+
+  public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
+  {
+    this.extractor = extractor;
+  }
+
+
+  @Override
+  public void init(ProcessorContext context)
+  {
+    this.context = context;
+    store = (KeyValueStore<Integer, Long>) context.getStateStore(STORE);
+  }
+
+  @Override
+  public Iterable<V> transform(K key, V value)
+  {
+    String topic = context.topic();
+    Integer partition = context.partition();
+    long offset = context.offset();
+    Headers headers = context.headers();
+
+    long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
+
+    Long seen = store.get(partition);
+    if (seen == null || seen < sequenceNumber)
+    {
+      store.put(partition, sequenceNumber);
+      return Arrays.asList(value);
+    }
+
+    log.info(
+        "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
+        key,
+        sequenceNumber,
+        seen,
+        offset,
+        partition);
+
+    // Signal, that the message has already been seen.
+    // Downstream has to filter the null-values...
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void close() {}
+}
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java
new file mode 100644 (file)
index 0000000..37210e8
--- /dev/null
@@ -0,0 +1,23 @@
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
+
+public class DeduplicationTransformerSupplier<K, V> implements ValueTransformerWithKeySupplier<K, V, Iterable<V>>
+{
+  SequenceNumberExtractor<K, V> extractor;
+
+
+  public DeduplicationTransformerSupplier(SequenceNumberExtractor<K, V> extractor)
+  {
+    this.extractor = extractor;
+  }
+
+
+  @Override
+  public ValueTransformerWithKey<K, V, Iterable<V>> get()
+  {
+    return new DeduplicationTransformer<K, V>(extractor);
+  }
+}
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
new file mode 100644 (file)
index 0000000..3585604
--- /dev/null
@@ -0,0 +1,94 @@
+package de.juplo.demo.kafka.deduplication;
+
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.time.Duration;
+import java.util.Properties;
+
+
+@Component
+public class Deduplicator
+{
+  final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class);
+
+  public final KafkaStreams streams;
+
+
+  public Deduplicator()
+  {
+    Properties properties = new Properties();
+    properties.put("bootstrap.servers", "kafka:9092");
+    properties.put("application.id", "streams-deduplicator");
+    properties.put("default.key.serde", Serdes.StringSerde.class);
+    properties.put("default.value.serde", Serdes.StringSerde.class);
+
+    SequenceNumberExtractor<String, String> extractor =
+        new SequenceNumberExtractor<String, String>() {
+          @Override
+          public long extract(
+              String topic, int partition, long offset, Headers headers, String key, String value)
+          {
+            return Long.parseLong(value);
+          }
+        };
+
+    StreamsBuilder builder = new StreamsBuilder();
+
+    // Create state-store for sequence numbers
+    StoreBuilder<KeyValueStore<Integer,Long>> store =
+        Stores.keyValueStoreBuilder(
+            Stores.persistentKeyValueStore(DeduplicationTransformer.STORE),
+            Serdes.Integer(),
+            Serdes.Long());
+    // register store
+    builder.addStateStore(store);
+
+    builder
+        .<String, String>stream("input")
+        .flatTransformValues(
+            new DeduplicationTransformerSupplier<String, String>(extractor),
+            DeduplicationTransformer.STORE)
+        .to("output");
+
+    Topology topology = builder.build();
+    streams = new KafkaStreams(topology, properties);
+    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+    {
+      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+      try
+      {
+        streams.close(Duration.ofSeconds(5));
+      }
+      catch (Exception ex)
+      {
+        LOG.error("Could not close KafkaStreams!", ex);
+      }
+    });
+  }
+
+
+  @PostConstruct
+  public void start()
+  {
+    streams.start();
+  }
+
+  @PreDestroy
+  public void stop()
+  {
+    streams.close(Duration.ofSeconds(5));
+  }
+}
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java b/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java
new file mode 100644 (file)
index 0000000..8f74c71
--- /dev/null
@@ -0,0 +1,21 @@
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.common.header.Headers;
+
+
+public interface SequenceNumberExtractor<K,V>
+{
+  /**
+   * Extracts a sequence number from the given value.
+   *
+   * The sequence number must be represented as a {@link Long} value.
+   *
+   * @param topic The topic, the message was issued on
+   * @param partition The partition, the message was written to
+   * @param offset The offset of the message in the partition
+   * @param key The key of the message
+   * @param value The value of the message
+   * @return a unique ID
+   */
+  public long extract(String topic, int partition, long offset, Headers headers, K key, V value);
+}
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsDeduplicatorApplication.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsDeduplicatorApplication.java
new file mode 100644 (file)
index 0000000..5331209
--- /dev/null
@@ -0,0 +1,14 @@
+  package de.juplo.demo.kafka.deduplication;
+
+  import org.springframework.boot.SpringApplication;
+  import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+
+  @SpringBootApplication
+  public class StreamsDeduplicatorApplication
+  {
+    public static void main(String[] args)
+    {
+      SpringApplication.run(StreamsDeduplicatorApplication.class, args);
+    }
+  }