From ef9ac681ba2219cbb545e2d6d0381ba56a4ddee5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Oct 2020 15:37:08 +0200 Subject: [PATCH 01/16] Offsets are not needed by the test --- .../kafka/deduplication/DeduplicationTransformerIT.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java index bf2fc2d..f88f371 100644 --- a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java @@ -33,11 +33,11 @@ public class DeduplicationTransformerIT context.register(store, null); transformer.init(context); context.setTopic("foo"); + context.setOffset(1); Iterator transformed; context.setPartition(0); - context.setOffset(1); transformed = transformer.transform("1", "1").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("1"); @@ -45,7 +45,6 @@ public class DeduplicationTransformerIT assertThat(store.get(0)).isEqualTo(1l); context.setPartition(1); - context.setOffset(1); transformed = transformer.transform("2", "2").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("2"); @@ -54,14 +53,12 @@ public class DeduplicationTransformerIT assertThat(store.get(1)).isEqualTo(2l); context.setPartition(0); - context.setOffset(2); transformed = transformer.transform("1", "1").iterator(); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(1l); assertThat(store.get(1)).isEqualTo(2l); context.setPartition(0); - context.setOffset(3); transformed = transformer.transform("1", "4").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("4"); @@ -71,7 +68,6 @@ public class DeduplicationTransformerIT // The order is only guaranteed per partition! context.setPartition(2); - context.setOffset(1); transformed = transformer.transform("3", "3").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("3"); @@ -81,7 +77,6 @@ public class DeduplicationTransformerIT assertThat(store.get(2)).isEqualTo(3l); context.setPartition(1); - context.setOffset(2); transformed = transformer.transform("2", "2").iterator(); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(4l); @@ -89,7 +84,6 @@ public class DeduplicationTransformerIT assertThat(store.get(2)).isEqualTo(3l); context.setPartition(2); - context.setOffset(2); transformed = transformer.transform("3", "5").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("5"); @@ -100,7 +94,6 @@ public class DeduplicationTransformerIT // The order is only guaranteed per partition! context.setPartition(1); - context.setOffset(3); transformed = transformer.transform("2", "6").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("6"); -- 2.20.1 From ac16b72a27bf9c16dee82558f1f47f6d0f0838bf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Oct 2020 17:56:09 +0200 Subject: [PATCH 02/16] Implemented an unit-test for the transformer --- .../DeduplicationTransformerTest.java | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java new file mode 100644 index 0000000..59f6322 --- /dev/null +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java @@ -0,0 +1,85 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.state.KeyValueStore; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit.jupiter.SpringExtension; + +import java.util.Iterator; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + + +@ExtendWith(SpringExtension.class) +public class DeduplicationTransformerTest +{ + @MockBean + ProcessorContext context; + @MockBean + KeyValueStore store; + + DeduplicationTransformer transformer = new DeduplicationTransformer(); + + + @BeforeEach + public void setUpTransformer() + { + when(context.getStateStore(DeduplicationTransformer.STORE)).thenReturn(store); + transformer.init(context); + } + + + @Test + public void testStoresSequenceNumberAndForwardesValueIfStoreIsEmpty() + { + when(store.get(anyInt())).thenReturn(null); + when(context.partition()).thenReturn(0); + + Iterator result = transformer.transform("foo", "1").iterator(); + + assertThat(result.hasNext()).isTrue(); + assertThat(result.next()).isEqualTo("1"); + assertThat(result.hasNext()).isFalse(); + verify(store, atLeastOnce()).put(eq(0), eq(1l)); + } + + @ParameterizedTest + @ValueSource(longs = { 2, 3, 4, 5, 6, 7 }) + public void testStoresSequenceNumberAndForwardesValueIfSequenceNumberIsGreater(long sequenceNumber) + { + String value = Long.toString(sequenceNumber); + + when(store.get(anyInt())).thenReturn(1l); + when(context.partition()).thenReturn(0); + + Iterator result = transformer.transform("foo", value).iterator(); + + assertThat(result.hasNext()).isTrue(); + assertThat(result.next()).isEqualTo(value); + assertThat(result.hasNext()).isFalse(); + verify(store, atLeastOnce()).put(eq(0), eq(sequenceNumber)); + } + + @ParameterizedTest + @ValueSource(longs = { 1, 2, 3, 4, 5, 6, 7 }) + public void testDropsValueIfSequenceNumberIsNotGreater(long sequenceNumber) + { + String value = Long.toString(sequenceNumber); + + when(store.get(anyInt())).thenReturn(7l); + when(context.partition()).thenReturn(0); + + Iterator result = transformer.transform("foo", value).iterator(); + + assertThat(result.hasNext()).isFalse(); + verify(store, never()).put(eq(0), eq(sequenceNumber)); + } +} -- 2.20.1 From fbc72b4abf5eefb2150a77344997cd4bef824fe5 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Oct 2020 18:07:39 +0200 Subject: [PATCH 03/16] Moved the building of the topology into a static method --- .../kafka/deduplication/Deduplicator.java | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index af7da70..8f173f7 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -36,6 +36,23 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(Duration.ofSeconds(5)); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + static Topology buildTopology() + { StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -61,23 +78,9 @@ public class Deduplicator DeduplicationTransformer.STORE) .to("output"); - Topology topology = builder.build(); - streams = new KafkaStreams(topology, properties); - streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> - { - LOG.error("Unexpected error in thread {}: {}", t, e.toString()); - try - { - streams.close(Duration.ofSeconds(5)); - } - catch (Exception ex) - { - LOG.error("Could not close KafkaStreams!", ex); - } - }); + return builder.build(); } - @PostConstruct public void start() { -- 2.20.1 From 0187521d27874ea7812d9116e76ef9c1a499368f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Oct 2020 22:24:56 +0200 Subject: [PATCH 04/16] Added an endpoint to query the watermarks by partition --- docker-compose.yml | 4 ++ pom.xml | 2 +- .../kafka/deduplication/Deduplicator.java | 67 ++++++++++++++++--- src/main/resources/application.yml | 3 + 4 files changed, 65 insertions(+), 11 deletions(-) create mode 100644 src/main/resources/application.yml diff --git a/docker-compose.yml b/docker-compose.yml index 17aff95..2b51357 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -22,6 +22,10 @@ services: deduplicator: image: juplo/deduplicator:streams + ports: + - 8080:8080 + environment: + server.address: deduplicator depends_on: - zookeeper - kafka diff --git a/pom.xml b/pom.xml index bcb083b..a1566e1 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ org.springframework.boot - spring-boot-starter + spring-boot-starter-web org.apache.kafka diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 8f173f7..5c0f554 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,18 +1,22 @@ package de.juplo.demo.kafka.deduplication; +import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.*; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; @@ -20,15 +24,16 @@ import java.time.Duration; import java.util.Properties; -@Component +@RestController public class Deduplicator { final static Logger LOG = LoggerFactory.getLogger(Deduplicator.class); public final KafkaStreams streams; + public final String host; + public final int port; - - public Deduplicator() + public Deduplicator(ServerProperties serverProperties) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -36,6 +41,10 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); + this.host = serverProperties.getAddress().getHostAddress(); + this.port = serverProperties.getPort(); + properties.put("application.server", host + ":" + port); + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> { @@ -81,6 +90,44 @@ public class Deduplicator return builder.build(); } + @GetMapping(path = "/watermark/{partition}", produces = MediaType.TEXT_PLAIN_VALUE) + public ResponseEntity getWatermarks(@PathVariable Integer partition) + { + KeyQueryMetadata metadata = + streams.queryMetadataForKey( + DeduplicationTransformer.STORE, + partition, new IntegerSerializer()); + + if (metadata.getActiveHost().port() != port || + !metadata.getActiveHost().host().equals(host)) + { + return + ResponseEntity + .status(HttpStatus.TEMPORARY_REDIRECT) + .header( + HttpHeaders.LOCATION, + "http://" + + metadata.getActiveHost().host() + + ":" + + metadata.getActiveHost().port() + + "/watermark/" + + partition) + .build(); + } + + ReadOnlyKeyValueStore store = streams.store( + StoreQueryParameters.fromNameAndType( + DeduplicationTransformer.STORE, + QueryableStoreTypes.keyValueStore())); + + Long watermark = store.get(partition); + if (watermark == null) + return ResponseEntity.notFound().build(); + + return ResponseEntity.ok().body(watermark.toString()); + } + + @PostConstruct public void start() { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..7ef7e54 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,3 @@ +server: + address: 127.0.0.1 + port: 8080 -- 2.20.1 From e1bd976486d1d1a087567a6c6081637d48706f44 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 9 Oct 2020 23:47:30 +0200 Subject: [PATCH 05/16] Added actuators and implemented an indicator that reports the streams-state * Refined README.sh: The script waits, until the health-indicator signals, that the deduplicator is up and RUNNING --- Dockerfile | 1 + README.sh | 25 +++++++-- pom.xml | 4 ++ .../kafka/deduplication/Deduplicator.java | 5 +- .../deduplication/StreamsHealthIndicator.java | 55 +++++++++++++++++++ src/main/resources/application.yml | 5 ++ 6 files changed, 89 insertions(+), 6 deletions(-) create mode 100644 src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java diff --git a/Dockerfile b/Dockerfile index bd8b1e3..5a7b5f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,3 +1,4 @@ FROM openjdk:11-jre-slim COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar +EXPOSE 8080 CMD ["java", "-jar", "/opt/app.jar"] diff --git a/README.sh b/README.sh index 6646d24..00383c4 100755 --- a/README.sh +++ b/README.sh @@ -7,20 +7,35 @@ then exit fi -mvn package - -docker build -t juplo/deduplicator:streams . +if [[ $(docker image ls -q juplo/deduplicator:streams) == "" ]] +then + mvn package + docker build -t juplo/deduplicator:streams . +else + echo "Using image existing image:" + docker image ls juplo/deduplicator:streams +fi docker-compose up -d zookeeper kafka -while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done +while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; +do + echo "Waiting for kafka..."; + sleep 1; +done kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output docker-compose up -d deduplicator + +while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .components.streams.status) == "RUNNING" ]]; +do + echo "Waiting for Streams-Application..."; + sleep 1; +done + cat data.txt | kafkacat -K: -b localhost:9092 -t input -sleep 5 kafkacat -C -b localhost:9092 -t input -e | wc -l kafkacat -C -b localhost:9092 -t output -e | wc -l diff --git a/pom.xml b/pom.xml index a1566e1..18736fb 100644 --- a/pom.xml +++ b/pom.xml @@ -23,6 +23,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-actuator + org.apache.kafka kafka-streams diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 5c0f554..04a2d82 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -33,7 +33,9 @@ public class Deduplicator public final String host; public final int port; - public Deduplicator(ServerProperties serverProperties) + public Deduplicator( + ServerProperties serverProperties, + StreamsHealthIndicator healthIndicator) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -58,6 +60,7 @@ public class Deduplicator LOG.error("Could not close KafkaStreams!", ex); } }); + streams.setStateListener(healthIndicator); } static Topology buildTopology() diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java new file mode 100644 index 0000000..f531256 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java @@ -0,0 +1,55 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.KafkaStreams; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; + + +@Component +public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener +{ + public final static Status CREATED = new Status("CREATED"); + public final static Status RUNNING = new Status("RUNNING"); + public final static Status REBALANCING = new Status("REBALANCING"); + public final static Status ERROR = new Status("ERROR"); + public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN"); + public final static Status NOT_RUNNING = new Status("NOT_RUNNING"); + + private Status status = Status.UNKNOWN; + + @Override + protected synchronized void doHealthCheck(Health.Builder builder) throws Exception + { + builder.status(status); + } + + @Override + public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) + { + switch (newState) + { + case CREATED: + status = CREATED; + break; + case RUNNING: + status = RUNNING; + break; + case REBALANCING: + status = REBALANCING; + break; + case ERROR: + status = ERROR; + break; + case PENDING_SHUTDOWN: + status = PENDING_SHUTDOWN; + break; + case NOT_RUNNING: + status = NOT_RUNNING; + break; + default: + status = null; + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7ef7e54..985a34f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,3 +1,8 @@ server: address: 127.0.0.1 port: 8080 +management: + endpoint: + health: + show-details: always + show-components: always -- 2.20.1 From eada2e8eb27b2cff46a1cb87dfa5817dab008e0d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Oct 2020 18:29:32 +0200 Subject: [PATCH 06/16] Added generated *.txt files to clean-up --- README.sh | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/README.sh b/README.sh index 00383c4..dcd632a 100755 --- a/README.sh +++ b/README.sh @@ -4,6 +4,7 @@ if [ "$1" = "cleanup" ] then docker-compose down mvn clean + rm *.txt exit fi @@ -18,6 +19,11 @@ fi docker-compose up -d zookeeper kafka +if [ ! -e data.txt ]; +then + echo ./create-data.sh +fi + while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; -- 2.20.1 From d0dd1937d8dbb0b540559dbd397506654c50de0c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Oct 2020 18:30:17 +0200 Subject: [PATCH 07/16] Using names instead of numbers as key for the messages --- README.sh | 14 +++++++------- create-data.sh | 49 +++++++++++++++++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 21 deletions(-) diff --git a/README.sh b/README.sh index dcd632a..3b0e897 100755 --- a/README.sh +++ b/README.sh @@ -46,10 +46,10 @@ cat data.txt | kafkacat -K: -b localhost:9092 -t input kafkacat -C -b localhost:9092 -t input -e | wc -l kafkacat -C -b localhost:9092 -t output -e | wc -l -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^0 > result_0.txt -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^1 > result_1.txt -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^2 > result_2.txt -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^3 > result_3.txt -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^4 > result_4.txt -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^5 > result_5.txt -kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | grep ^6 > result_6.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^peter/ { print $2 }' > result_peter.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^franz/ { print $2 }' > result_franz.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^ute/ { print $2 }' > result_ute.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^klaus/ { print $2 }' > result_klaus.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^paul/ { print $2 }' > result_paul.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^petra/ { print $2 }' > result_petra.txt +kafkacat -C -b localhost:9092 -t output -e -f'%k:%s\n' | awk -F: '/^siggi/ { print $2 }' > result_siggi.txt diff --git a/create-data.sh b/create-data.sh index d64ca41..ea429a3 100755 --- a/create-data.sh +++ b/create-data.sh @@ -1,17 +1,38 @@ #!/bin/bash -for i in `seq 1 333`; do echo $(($i%7)):$i; done > data.txt -for i in `seq 70 578`; do echo $(($i%7)):$i; done >> data.txt -for i in `seq 400 1211`; do echo $(($i%7)):$i; done >> data.txt -for i in `seq 1000 1111`; do echo $(($i%7)):$i; done >> data.txt -for i in `seq 1200 1711`; do echo $(($i%7)):$i; done >> data.txt -for i in `seq 1688 3333`; do echo $(($i%7)):$i; done >> data.txt -for i in `seq 2567 3500`; do echo $(($i%7)):$i; done >> data.txt +function name() +{ + case $(($1%7)) in + 0) echo "peter" + ;; + 1) echo "franz" + ;; + 2) echo "ute" + ;; + 3) echo "klaus" + ;; + 4) echo "paul" + ;; + 5) echo "petra" + ;; + 6) echo "siggi" + ;; + esac +} + +for i in `seq 1 333`; do echo $(name $i):$i; done > data.txt +for i in `seq 70 578`; do echo $(name $i):$i; done >> data.txt +for i in `seq 400 1211`; do echo $(name $i):$i; done >> data.txt +for i in `seq 1000 1111`; do echo $(name $i):$i; done >> data.txt +for i in `seq 1200 1711`; do echo $(name $i):$i; done >> data.txt +for i in `seq 1688 3333`; do echo $(name $i):$i; done >> data.txt +for i in `seq 2567 3500`; do echo $(name $i):$i; done >> data.txt + +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^peter/ { print $2 }'; done > expected_peter.txt +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^franz/ { print $2 }'; done > expected_franz.txt +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^ute/ { print $2 }'; done > expected_ute.txt +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^klaus/ { print $2 }'; done > expected_klaus.txt +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^paul/ { print $2 }'; done > expected_paul.txt +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^petra/ { print $2 }'; done > expected_petra.txt +for i in `seq 1 3500`; do echo $(name $i):$i | awk -F: '/^siggi/ { print $2 }'; done > expected_siggi.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^0 > expected_0.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^1 > expected_1.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^2 > expected_2.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^3 > expected_3.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^4 > expected_4.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^5 > expected_5.txt -for i in `seq 1 3500`; do echo $(($i%7)):$i; done | grep ^6 > expected_6.txt -- 2.20.1 From 1a499102e0b91be12efdc92cf8e906e704749ce7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Oct 2020 20:37:10 +0200 Subject: [PATCH 08/16] Introduced a parameter to force a build in the helper-scirpt README.sh * Also added fast failure for build-errors --- README.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.sh b/README.sh index 3b0e897..b71031c 100755 --- a/README.sh +++ b/README.sh @@ -8,10 +8,10 @@ then exit fi -if [[ $(docker image ls -q juplo/deduplicator:streams) == "" ]] +if [[ $(docker image ls -q juplo/deduplicator:streams) == "" || "$1" = "build" ]] then - mvn package - docker build -t juplo/deduplicator:streams . + mvn package || exit + docker build -t juplo/deduplicator:streams . || exit else echo "Using image existing image:" docker image ls juplo/deduplicator:streams -- 2.20.1 From f38407e9e0ffec5e6441fbf20a823d8356080bee Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Oct 2020 20:37:57 +0200 Subject: [PATCH 09/16] Further simplified the example: Knowledge about the key is not required --- .../deduplication/DeduplicationTransformer.java | 12 ++++-------- .../demo/kafka/deduplication/Deduplicator.java | 8 ++++---- .../DeduplicationTransformerIT.java | 16 ++++++++-------- .../DeduplicationTransformerTest.java | 6 +++--- 4 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java index 3bc92d1..dc888bc 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -2,7 +2,7 @@ package de.juplo.demo.kafka.deduplication; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; @@ -11,7 +11,7 @@ import java.util.Collections; @Slf4j -public class DeduplicationTransformer implements ValueTransformerWithKey> +public class DeduplicationTransformer implements ValueTransformer> { public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE"; private ProcessorContext context; @@ -26,7 +26,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKey transform(String key, String value) + public Iterable transform(String value) { String topic = context.topic(); Integer partition = context.partition(); @@ -42,11 +42,7 @@ public class DeduplicationTransformer implements ValueTransformerWithKeystream("input") .flatTransformValues( - new ValueTransformerWithKeySupplier>() + new ValueTransformerSupplier>() { @Override - public ValueTransformerWithKey> get() + public ValueTransformer> get() { return new DeduplicationTransformer(); } diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java index f88f371..339a301 100644 --- a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java @@ -38,14 +38,14 @@ public class DeduplicationTransformerIT Iterator transformed; context.setPartition(0); - transformed = transformer.transform("1", "1").iterator(); + transformed = transformer.transform("1").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("1"); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(1l); context.setPartition(1); - transformed = transformer.transform("2", "2").iterator(); + transformed = transformer.transform("2").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("2"); assertThat(transformed.hasNext()).isFalse(); @@ -53,13 +53,13 @@ public class DeduplicationTransformerIT assertThat(store.get(1)).isEqualTo(2l); context.setPartition(0); - transformed = transformer.transform("1", "1").iterator(); + transformed = transformer.transform("1").iterator(); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(1l); assertThat(store.get(1)).isEqualTo(2l); context.setPartition(0); - transformed = transformer.transform("1", "4").iterator(); + transformed = transformer.transform("4").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("4"); assertThat(transformed.hasNext()).isFalse(); @@ -68,7 +68,7 @@ public class DeduplicationTransformerIT // The order is only guaranteed per partition! context.setPartition(2); - transformed = transformer.transform("3", "3").iterator(); + transformed = transformer.transform("3").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("3"); assertThat(transformed.hasNext()).isFalse(); @@ -77,14 +77,14 @@ public class DeduplicationTransformerIT assertThat(store.get(2)).isEqualTo(3l); context.setPartition(1); - transformed = transformer.transform("2", "2").iterator(); + transformed = transformer.transform("2").iterator(); assertThat(transformed.hasNext()).isFalse(); assertThat(store.get(0)).isEqualTo(4l); assertThat(store.get(1)).isEqualTo(2l); assertThat(store.get(2)).isEqualTo(3l); context.setPartition(2); - transformed = transformer.transform("3", "5").iterator(); + transformed = transformer.transform("5").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("5"); assertThat(transformed.hasNext()).isFalse(); @@ -94,7 +94,7 @@ public class DeduplicationTransformerIT // The order is only guaranteed per partition! context.setPartition(1); - transformed = transformer.transform("2", "6").iterator(); + transformed = transformer.transform("6").iterator(); assertThat(transformed.hasNext()).isTrue(); assertThat(transformed.next()).isEqualTo("6"); assertThat(transformed.hasNext()).isFalse(); diff --git a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java index 59f6322..8862c03 100644 --- a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java +++ b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java @@ -43,7 +43,7 @@ public class DeduplicationTransformerTest when(store.get(anyInt())).thenReturn(null); when(context.partition()).thenReturn(0); - Iterator result = transformer.transform("foo", "1").iterator(); + Iterator result = transformer.transform("1").iterator(); assertThat(result.hasNext()).isTrue(); assertThat(result.next()).isEqualTo("1"); @@ -60,7 +60,7 @@ public class DeduplicationTransformerTest when(store.get(anyInt())).thenReturn(1l); when(context.partition()).thenReturn(0); - Iterator result = transformer.transform("foo", value).iterator(); + Iterator result = transformer.transform(value).iterator(); assertThat(result.hasNext()).isTrue(); assertThat(result.next()).isEqualTo(value); @@ -77,7 +77,7 @@ public class DeduplicationTransformerTest when(store.get(anyInt())).thenReturn(7l); when(context.partition()).thenReturn(0); - Iterator result = transformer.transform("foo", value).iterator(); + Iterator result = transformer.transform(value).iterator(); assertThat(result.hasNext()).isFalse(); verify(store, never()).put(eq(0), eq(sequenceNumber)); -- 2.20.1 From f8b17a973ac806aae2e33397e9594875e3358c96 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Oct 2020 20:40:27 +0200 Subject: [PATCH 10/16] Further simplified the example: Removed unnecessary variables --- .../demo/kafka/deduplication/DeduplicationTransformer.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java index dc888bc..2494672 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java @@ -1,7 +1,6 @@ package de.juplo.demo.kafka.deduplication; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.common.header.Headers; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.state.KeyValueStore; @@ -28,11 +27,7 @@ public class DeduplicationTransformer implements ValueTransformer transform(String value) { - String topic = context.topic(); Integer partition = context.partition(); - long offset = context.offset(); - Headers headers = context.headers(); - long sequenceNumber = Long.parseLong(value); Long seen = store.get(partition); -- 2.20.1 From e1718f9e143c9d6c3708b91b17b9ddd2f1e6b11f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 07:52:28 +0200 Subject: [PATCH 11/16] Released version `1.0.0` --- Dockerfile | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5a7b5f9..8b90411 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ FROM openjdk:11-jre-slim -COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar +COPY target/streams-deduplicator-*.jar /opt/app.jar EXPOSE 8080 CMD ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 18736fb..b1bb0e6 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ de.juplo.demo.kafka streams-deduplicator - 1.0-SNAPSHOT + 1.0.0 Streams-Deduplicator Deduplicator based on Kafka-Streams -- 2.20.1 From d61b5f1ccb1bf39bec58c06a362bcbefaddd238f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 07:53:33 +0200 Subject: [PATCH 12/16] Prepared for development of version `1.0.1` --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b1bb0e6..8db5538 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ de.juplo.demo.kafka streams-deduplicator - 1.0.0 + 1.0.1-SNAPSHOT Streams-Deduplicator Deduplicator based on Kafka-Streams -- 2.20.1 From 5c698151c9f80219fcb821ca71a3f9c78da4d16f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 07:43:58 +0200 Subject: [PATCH 13/16] Upgreaded Spring Boot from `2.3.4.RELEASE` to `3.2.5` --- Dockerfile | 2 +- pom.xml | 2 +- .../java/de/juplo/demo/kafka/deduplication/Deduplicator.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8b90411..96e2f0c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:17-jre-alpine COPY target/streams-deduplicator-*.jar /opt/app.jar EXPOSE 8080 CMD ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 8db5538..f363b88 100644 --- a/pom.xml +++ b/pom.xml @@ -8,7 +8,7 @@ org.springframework.boot spring-boot-starter-parent - 2.3.4.RELEASE + 3.2.5 de.juplo.demo.kafka diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 38ec22a..e1027d0 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -1,6 +1,8 @@ package de.juplo.demo.kafka.deduplication; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; @@ -18,8 +20,6 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; -import javax.annotation.PostConstruct; -import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Properties; -- 2.20.1 From dd9aad09a43ec43cb92baa558e9ae1af6f275449 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 07:44:43 +0200 Subject: [PATCH 14/16] Simplified code using a lambda --- .../de/juplo/demo/kafka/deduplication/Deduplicator.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index e1027d0..4376f30 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -79,14 +79,7 @@ public class Deduplicator builder .stream("input") .flatTransformValues( - new ValueTransformerSupplier>() - { - @Override - public ValueTransformer> get() - { - return new DeduplicationTransformer(); - } - }, + () -> new DeduplicationTransformer(), DeduplicationTransformer.STORE) .to("output"); -- 2.20.1 From eaa0f71e3dd8d051f9765a2215d11d32e0363f48 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 07:55:00 +0200 Subject: [PATCH 15/16] Released version `1.0.1` --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f363b88..fc4286b 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ de.juplo.demo.kafka streams-deduplicator - 1.0.1-SNAPSHOT + 1.0.1 Streams-Deduplicator Deduplicator based on Kafka-Streams -- 2.20.1 From 87d09f33a295209d78fc016801b1f6e39ff1e29a Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 5 May 2024 07:55:47 +0200 Subject: [PATCH 16/16] Prepared for development ov version `1.0.2` --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index fc4286b..933bbb8 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ de.juplo.demo.kafka streams-deduplicator - 1.0.1 + 1.0.2-SNAPSHOT Streams-Deduplicator Deduplicator based on Kafka-Streams -- 2.20.1