From: Kai Moritz Date: Thu, 8 Oct 2020 16:44:06 +0000 (+0200) Subject: A deduplicator, that assumes steadily increasing global sequnece numbers X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=commitdiff_plain;h=5bb3d2908ccfe091084aa8a0a8b1282fb19fb3e1 A deduplicator, that assumes steadily increasing global sequnece numbers --- 5bb3d2908ccfe091084aa8a0a8b1282fb19fb3e1 diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6bcc896 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.idea +target +data.txt +expected_* +result_* diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..bd8b1e3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,3 @@ +FROM openjdk:11-jre-slim +COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar +CMD ["java", "-jar", "/opt/app.jar"] diff --git a/README.sh b/README.sh new file mode 100755 index 0000000..6646d24 --- /dev/null +++ b/README.sh @@ -0,0 +1,34 @@ +#!/bin/bash + +if [ "$1" = "cleanup" ] +then + docker-compose down + mvn clean + exit +fi + +mvn package + +docker build -t juplo/deduplicator:streams . + +docker-compose up -d zookeeper kafka + +while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done + +kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input +kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output + +docker-compose up -d deduplicator +cat data.txt | kafkacat -K: -b localhost:9092 -t input +sleep 5 + +kafkacat -C -b localhost:9092 -t input -e | wc -l +kafkacat -C -b localhost:9092 -t output -e | wc -l + +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^0 > result_0.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^1 > result_1.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^2 > result_2.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^3 > result_3.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^4 > result_4.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^5 > result_5.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^6 > result_6.txt diff --git a/create-data.sh b/create-data.sh new file mode 100755 index 0000000..d64ca41 --- /dev/null +++ b/create-data.sh @@ -0,0 +1,17 @@ +#!/bin/bash + +for i in `seq 1 333`; do echo $(($i%7)):$i; done > data.txt +for i in `seq 70 578`; do echo $(($i%7)):$i; done >> data.txt +for i in `seq 400 1211`; do echo $(($i%7)):$i; done >> data.txt +for i in `seq 1000 1111`; do echo $(($i%7)):$i; done >> data.txt +for i in `seq 1200 1711`; do echo $(($i%7)):$i; done >> data.txt +for i in `seq 1688 3333`; do echo $(($i%7)):$i; done >> data.txt +for i in `seq 2567 3500`; do echo $(($i%7)):$i; done >> data.txt + +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^0 > expected_0.txt +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^1 > expected_1.txt +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^2 > expected_2.txt +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^3 > expected_3.txt +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^4 > expected_4.txt +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^5 > expected_5.txt +for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^6 > expected_6.txt diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..17aff95 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,32 @@ +version: "3.2" + +services: + zookeeper: + image: confluentinc/cp-zookeeper:5.3.0 + ports: + - 2181:2181 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + + kafka: + image: confluentinc/cp-kafka:5.3.0 + ports: + - 9092:9092 + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" + depends_on: + - zookeeper + + deduplicator: + image: juplo/deduplicator:streams + depends_on: + - zookeeper + - kafka + +networks: + default: + external: + name: trion diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..6be4c4b --- /dev/null +++ b/pom.xml @@ -0,0 +1,48 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 2.3.4.RELEASE + + de.juplo.demo.kafka + streams-deduplicator + 1.0-SNAPSHOT + Streams-Deduplicator + Deduplicator based on Kafka-Streams + + + + + org.springframework.boot + spring-boot-starter + + + org.apache.kafka + kafka-streams + + + org.projectlombok + lombok + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java new file mode 100644 index 0000000..e07fb37 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -0,0 +1,68 @@ +package de.juplo.demo.kafka.deduplication; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.Arrays; +import java.util.Collections; + + +@Slf4j +public class DeduplicationTransformer implements ValueTransformerWithKey> +{ + final SequenceNumberExtractor extractor; + + public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE"; + private ProcessorContext context; + private KeyValueStore store; + + + public DeduplicationTransformer(SequenceNumberExtractor extractor) + { + this.extractor = extractor; + } + + + @Override + public void init(ProcessorContext context) + { + this.context = context; + store = (KeyValueStore) context.getStateStore(STORE); + } + + @Override + public Iterable transform(K key, V value) + { + String topic = context.topic(); + Integer partition = context.partition(); + long offset = context.offset(); + Headers headers = context.headers(); + + long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value); + + Long seen = store.get(partition); + if (seen == null || seen < sequenceNumber) + { + store.put(partition, sequenceNumber); + return Arrays.asList(value); + } + + log.info( + "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}", + key, + sequenceNumber, + seen, + offset, + partition); + + // Signal, that the message has already been seen. + // Downstream has to filter the null-values... + return Collections.emptyList(); + } + + @Override + public void close() {} +} diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java new file mode 100644 index 0000000..37210e8 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerSupplier.java @@ -0,0 +1,23 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; + + +public class DeduplicationTransformerSupplier implements ValueTransformerWithKeySupplier> +{ + SequenceNumberExtractor extractor; + + + public DeduplicationTransformerSupplier(SequenceNumberExtractor extractor) + { + this.extractor = extractor; + } + + + @Override + public ValueTransformerWithKey> get() + { + return new DeduplicationTransformer(extractor); + } +} diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java new file mode 100644 index 0000000..3585604 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -0,0 +1,94 @@ +package de.juplo.demo.kafka.deduplication; + + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import java.time.Duration; +import java.util.Properties; + + +@Component +public class Deduplicator +{ + final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class); + + public final KafkaStreams streams; + + + public Deduplicator() + { + Properties properties = new Properties(); + properties.put("bootstrap.servers", "kafka:9092"); + properties.put("application.id", "streams-deduplicator"); + properties.put("default.key.serde", Serdes.StringSerde.class); + properties.put("default.value.serde", Serdes.StringSerde.class); + + SequenceNumberExtractor extractor = + new SequenceNumberExtractor() { + @Override + public long extract( + String topic, int partition, long offset, Headers headers, String key, String value) + { + return Long.parseLong(value); + } + }; + + StreamsBuilder builder = new StreamsBuilder(); + + // Create state-store for sequence numbers + StoreBuilder> store = + Stores.keyValueStoreBuilder( + Stores.persistentKeyValueStore(DeduplicationTransformer.STORE), + Serdes.Integer(), + Serdes.Long()); + // register store + builder.addStateStore(store); + + builder + .stream("input") + .flatTransformValues( + new DeduplicationTransformerSupplier(extractor), + DeduplicationTransformer.STORE) + .to("output"); + + Topology topology = builder.build(); + streams = new KafkaStreams(topology, properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(Duration.ofSeconds(5)); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + + @PostConstruct + public void start() + { + streams.start(); + } + + @PreDestroy + public void stop() + { + streams.close(Duration.ofSeconds(5)); + } +} diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java b/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java new file mode 100644 index 0000000..8f74c71 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/SequenceNumberExtractor.java @@ -0,0 +1,21 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.common.header.Headers; + + +public interface SequenceNumberExtractor +{ + /** + * Extracts a sequence number from the given value. + * + * The sequence number must be represented as a {@link Long} value. + * + * @param topic The topic, the message was issued on + * @param partition The partition, the message was written to + * @param offset The offset of the message in the partition + * @param key The key of the message + * @param value The value of the message + * @return a unique ID + */ + public long extract(String topic, int partition, long offset, Headers headers, K key, V value); +} diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsDeduplicatorApplication.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsDeduplicatorApplication.java new file mode 100644 index 0000000..5331209 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsDeduplicatorApplication.java @@ -0,0 +1,14 @@ + package de.juplo.demo.kafka.deduplication; + + import org.springframework.boot.SpringApplication; + import org.springframework.boot.autoconfigure.SpringBootApplication; + + + @SpringBootApplication + public class StreamsDeduplicatorApplication + { + public static void main(String[] args) + { + SpringApplication.run(StreamsDeduplicatorApplication.class, args); + } + }