package de.juplo.demo.kafka.deduplication;
import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
@Slf4j
-public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
+public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
{
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
}
@Override
- public Iterable<String> transform(String key, String value)
+ public Iterable<String> transform(String value)
{
- String topic = context.topic();
Integer partition = context.partition();
- long offset = context.offset();
- Headers headers = context.headers();
-
long sequenceNumber = Long.parseLong(value);
Long seen = store.get(partition);
return Arrays.asList(value);
}
- log.info(
- "ignoring message for key {} with sequence-number {} <= {}",
- key,
- sequenceNumber,
- seen);
+ log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...