@Slf4j
-public class DeduplicationTransformer<K, V> implements ValueTransformerWithKey<K, V, Iterable<V>>
+public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
{
- final SequenceNumberExtractor<K, V> extractor;
-
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
private KeyValueStore<Integer, Long> store;
- public DeduplicationTransformer(SequenceNumberExtractor<K, V> extractor)
- {
- this.extractor = extractor;
- }
-
-
@Override
public void init(ProcessorContext context)
{
}
@Override
- public Iterable<V> transform(K key, V value)
+ public Iterable<String> transform(String key, String value)
{
String topic = context.topic();
Integer partition = context.partition();
long offset = context.offset();
Headers headers = context.headers();
- long sequenceNumber = extractor.extract(topic, partition, offset, headers, key, value);
+ long sequenceNumber = Long.parseLong(value);
Long seen = store.get(partition);
if (seen == null || seen < sequenceNumber)
}
log.info(
- "ignoring message for key {} with sequence-number {} <= {} found at offset {} of partition {}",
+ "ignoring message for key {} with sequence-number {} <= {}",
key,
sequenceNumber,
- seen,
- offset,
- partition);
+ seen);
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...