import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
@Slf4j
-public class DeduplicationTransformer implements ValueTransformerWithKey<String, String, Iterable<String>>
+public class DeduplicationTransformer implements ValueTransformer<String, Iterable<String>>
{
public final static String STORE = DeduplicationTransformer.class.getCanonicalName() + "_STORE";
private ProcessorContext context;
}
@Override
- public Iterable<String> transform(String key, String value)
+ public Iterable<String> transform(String value)
{
String topic = context.topic();
Integer partition = context.partition();
return Arrays.asList(value);
}
- log.info(
- "ignoring message for key {} with sequence-number {} <= {}",
- key,
- sequenceNumber,
- seen);
+ log.info("ignoring message with sequence-number {} <= {}", sequenceNumber, seen);
// Signal, that the message has already been seen.
// Downstream has to filter the null-values...
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.kstream.ValueTransformer;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
builder
.<String, String>stream("input")
.flatTransformValues(
- new ValueTransformerWithKeySupplier<String, String, Iterable<String>>()
+ new ValueTransformerSupplier<String, Iterable<String>>()
{
@Override
- public ValueTransformerWithKey<String, String, Iterable<String>> get()
+ public ValueTransformer<String, Iterable<String>> get()
{
return new DeduplicationTransformer();
}
Iterator<String> transformed;
context.setPartition(0);
- transformed = transformer.transform("1", "1").iterator();
+ transformed = transformer.transform("1").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("1");
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
context.setPartition(1);
- transformed = transformer.transform("2", "2").iterator();
+ transformed = transformer.transform("2").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("2");
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
- transformed = transformer.transform("1", "1").iterator();
+ transformed = transformer.transform("1").iterator();
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
- transformed = transformer.transform("1", "4").iterator();
+ transformed = transformer.transform("4").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("4");
assertThat(transformed.hasNext()).isFalse();
// The order is only guaranteed per partition!
context.setPartition(2);
- transformed = transformer.transform("3", "3").iterator();
+ transformed = transformer.transform("3").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("3");
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(1);
- transformed = transformer.transform("2", "2").iterator();
+ transformed = transformer.transform("2").iterator();
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(4l);
assertThat(store.get(1)).isEqualTo(2l);
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(2);
- transformed = transformer.transform("3", "5").iterator();
+ transformed = transformer.transform("5").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("5");
assertThat(transformed.hasNext()).isFalse();
// The order is only guaranteed per partition!
context.setPartition(1);
- transformed = transformer.transform("2", "6").iterator();
+ transformed = transformer.transform("6").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("6");
assertThat(transformed.hasNext()).isFalse();
when(store.get(anyInt())).thenReturn(null);
when(context.partition()).thenReturn(0);
- Iterator<String> result = transformer.transform("foo", "1").iterator();
+ Iterator<String> result = transformer.transform("1").iterator();
assertThat(result.hasNext()).isTrue();
assertThat(result.next()).isEqualTo("1");
when(store.get(anyInt())).thenReturn(1l);
when(context.partition()).thenReturn(0);
- Iterator<String> result = transformer.transform("foo", value).iterator();
+ Iterator<String> result = transformer.transform(value).iterator();
assertThat(result.hasNext()).isTrue();
assertThat(result.next()).isEqualTo(value);
when(store.get(anyInt())).thenReturn(7l);
when(context.partition()).thenReturn(0);
- Iterator<String> result = transformer.transform("foo", value).iterator();
+ Iterator<String> result = transformer.transform(value).iterator();
assertThat(result.hasNext()).isFalse();
verify(store, never()).put(eq(0), eq(sequenceNumber));