projects
/
demos
/
kafka
/
deduplication
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Further simplified the example: Knowledge about the key is not required
[demos/kafka/deduplication]
/
src
/
main
/
java
/
de
/
juplo
/
demo
/
kafka
/
deduplication
/
DeduplicationTransformer.java
diff --git
a/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
b/src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
index
3bc92d1
..
dc888bc
100644
(file)
--- a/
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
+++ b/
src/main/java/de/juplo/demo/kafka/deduplication/DeduplicationTransformer.java
@@
-2,7
+2,7
@@
package de.juplo.demo.kafka.deduplication;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.kstream.ValueTransformer
WithKey
;
+import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
@@
-11,7
+11,7
@@
import java.util.Collections;
@Slf4j
@Slf4j
-public class DeduplicationTransformer implements ValueTransformer
WithKey<String,
String, Iterable<String>>
+public class DeduplicationTransformer implements ValueTransformer
<
String, Iterable<String>>
{
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
{
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
@@
-26,7
+26,7
@@
public class DeduplicationTransformer implements ValueTransformerWithKey<String,
}
@Override
}
@Override
- public Iterable<String> transform(String
key, String
value)
+ public Iterable<String> transform(String value)
{
String topic = context.topic();
Integer partition = context.partition();
{
String topic = context.topic();
Integer partition = context.partition();
@@
-42,11
+42,7
@@
public class DeduplicationTransformer implements ValueTransformerWithKey<String,
return Arrays.asList(value);
}
return Arrays.asList(value);
}
- log.info(
- "ignoring message for key {} with sequence-number {} <= {}",
- key,
- sequenceNumber,
- seen);
+ log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...