projects
/
demos
/
kafka
/
deduplication
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
8304884
)
Further simplified the example: Knowledge about the key is not required
author
Kai Moritz
<kai@juplo.de>
Sat, 10 Oct 2020 18:37:57 +0000
(20:37 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 10 Oct 2020 19:35:42 +0000
(21:35 +0200)
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
patch
|
blob
|
history
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
patch
|
blob
|
history
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
patch
|
blob
|
history
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
index
3bc92d1
..
dc888bc
100644
(file)
--- a/
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
+++ b/
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
@@
-2,7
+2,7
@@
package de.juplo.demo.kafka.deduplication;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.kstream.ValueTransformer
WithKey
;
+import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
@@
-11,7
+11,7
@@
import java.util.Collections;
@Slf4j
@Slf4j
-public class DeduplicationTransformer implements ValueTransformer
WithKey<String,
String, Iterable<String>>
+public class DeduplicationTransformer implements ValueTransformer
<
String, Iterable<String>>
{
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
{
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
@@
-26,7
+26,7
@@
public class DeduplicationTransformer implements ValueTransformerWithKey<String,
}
@Override
}
@Override
- public Iterable<String> transform(String
key, String
value)
+ public Iterable<String> transform(String value)
{
String topic = context.topic();
Integer partition = context.partition();
{
String topic = context.topic();
Integer partition = context.partition();
@@
-42,11
+42,7
@@
public class DeduplicationTransformer implements ValueTransformerWithKey<String,
return Arrays.asList(value);
}
return Arrays.asList(value);
}
- log.info(
- "ignoring message for key {} with sequence-number {} <= {}",
- key,
- sequenceNumber,
- seen);
+ log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...
diff --git
a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
index
04a2d82
..
38ec22a
100644
(file)
--- a/
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
+++ b/
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
@@
-4,8
+4,8
@@
package de.juplo.demo.kafka.deduplication;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.ValueTransformer
WithKey
;
-import org.apache.kafka.streams.kstream.ValueTransformer
WithKey
Supplier;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.streams.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@
-79,10
+79,10
@@
public class Deduplicator
builder
.<String, String>stream("input")
.flatTransformValues(
builder
.<String, String>stream("input")
.flatTransformValues(
- new ValueTransformer
WithKeySupplier<String,
String, Iterable<String>>()
+ new ValueTransformer
Supplier<
String, Iterable<String>>()
{
@Override
{
@Override
- public ValueTransformer
WithKey<String,
String, Iterable<String>> get()
+ public ValueTransformer
<
String, Iterable<String>> get()
{
return new DeduplicationTransformer();
}
{
return new DeduplicationTransformer();
}
diff --git
a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
index
f88f371
..
339a301
100644
(file)
--- a/
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
+++ b/
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerIT.java
@@
-38,14
+38,14
@@
public class DeduplicationTransformerIT
Iterator<String> transformed;
context.setPartition(0);
Iterator<String> transformed;
context.setPartition(0);
- transformed = transformer.transform("1"
, "1"
).iterator();
+ transformed = transformer.transform("1").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("1");
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
context.setPartition(1);
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("1");
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
context.setPartition(1);
- transformed = transformer.transform("2"
, "2"
).iterator();
+ transformed = transformer.transform("2").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("2");
assertThat(transformed.hasNext()).isFalse();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("2");
assertThat(transformed.hasNext()).isFalse();
@@
-53,13
+53,13
@@
public class DeduplicationTransformerIT
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
- transformed = transformer.transform("1"
, "1"
).iterator();
+ transformed = transformer.transform("1").iterator();
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
- transformed = transformer.transform("
1", "
4").iterator();
+ transformed = transformer.transform("4").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("4");
assertThat(transformed.hasNext()).isFalse();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("4");
assertThat(transformed.hasNext()).isFalse();
@@
-68,7
+68,7
@@
public class DeduplicationTransformerIT
// The order is only guaranteed per partition!
context.setPartition(2);
// The order is only guaranteed per partition!
context.setPartition(2);
- transformed = transformer.transform("3"
, "3"
).iterator();
+ transformed = transformer.transform("3").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("3");
assertThat(transformed.hasNext()).isFalse();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("3");
assertThat(transformed.hasNext()).isFalse();
@@
-77,14
+77,14
@@
public class DeduplicationTransformerIT
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(1);
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(1);
- transformed = transformer.transform("2"
, "2"
).iterator();
+ transformed = transformer.transform("2").iterator();
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(4l);
assertThat(store.get(1)).isEqualTo(2l);
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(2);
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(4l);
assertThat(store.get(1)).isEqualTo(2l);
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(2);
- transformed = transformer.transform("
3", "
5").iterator();
+ transformed = transformer.transform("5").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("5");
assertThat(transformed.hasNext()).isFalse();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("5");
assertThat(transformed.hasNext()).isFalse();
@@
-94,7
+94,7
@@
public class DeduplicationTransformerIT
// The order is only guaranteed per partition!
context.setPartition(1);
// The order is only guaranteed per partition!
context.setPartition(1);
- transformed = transformer.transform("
2", "
6").iterator();
+ transformed = transformer.transform("6").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("6");
assertThat(transformed.hasNext()).isFalse();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("6");
assertThat(transformed.hasNext()).isFalse();
diff --git
a/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java
b/src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java
index
59f6322
..
8862c03
100644
(file)
--- a/
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java
+++ b/
src/test/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformerTest.java
@@
-43,7
+43,7
@@
public class DeduplicationTransformerTest
when(store.get(anyInt())).thenReturn(null);
when(context.partition()).thenReturn(0);
when(store.get(anyInt())).thenReturn(null);
when(context.partition()).thenReturn(0);
- Iterator<String> result = transformer.transform("
foo", "
1").iterator();
+ Iterator<String> result = transformer.transform("1").iterator();
assertThat(result.hasNext()).isTrue();
assertThat(result.next()).isEqualTo("1");
assertThat(result.hasNext()).isTrue();
assertThat(result.next()).isEqualTo("1");
@@
-60,7
+60,7
@@
public class DeduplicationTransformerTest
when(store.get(anyInt())).thenReturn(1l);
when(context.partition()).thenReturn(0);
when(store.get(anyInt())).thenReturn(1l);
when(context.partition()).thenReturn(0);
- Iterator<String> result = transformer.transform(
"foo",
value).iterator();
+ Iterator<String> result = transformer.transform(value).iterator();
assertThat(result.hasNext()).isTrue();
assertThat(result.next()).isEqualTo(value);
assertThat(result.hasNext()).isTrue();
assertThat(result.next()).isEqualTo(value);
@@
-77,7
+77,7
@@
public class DeduplicationTransformerTest
when(store.get(anyInt())).thenReturn(7l);
when(context.partition()).thenReturn(0);
when(store.get(anyInt())).thenReturn(7l);
when(context.partition()).thenReturn(0);
- Iterator<String> result = transformer.transform(
"foo",
value).iterator();
+ Iterator<String> result = transformer.transform(value).iterator();
assertThat(result.hasNext()).isFalse();
verify(store, never()).put(eq(0), eq(sequenceNumber));
assertThat(result.hasNext()).isFalse();
verify(store, never()).put(eq(0), eq(sequenceNumber));