context.register(store, null);
transformer.init(context);
context.setTopic("foo");
+ context.setOffset(1);
Iterator<String> transformed;
context.setPartition(0);
- context.setOffset(1);
transformed = transformer.transform("1", "1").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("1");
assertThat(store.get(0)).isEqualTo(1l);
context.setPartition(1);
- context.setOffset(1);
transformed = transformer.transform("2", "2").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("2");
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
- context.setOffset(2);
transformed = transformer.transform("1", "1").iterator();
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(1l);
assertThat(store.get(1)).isEqualTo(2l);
context.setPartition(0);
- context.setOffset(3);
transformed = transformer.transform("1", "4").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("4");
// The order is only guaranteed per partition!
context.setPartition(2);
- context.setOffset(1);
transformed = transformer.transform("3", "3").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("3");
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(1);
- context.setOffset(2);
transformed = transformer.transform("2", "2").iterator();
assertThat(transformed.hasNext()).isFalse();
assertThat(store.get(0)).isEqualTo(4l);
assertThat(store.get(2)).isEqualTo(3l);
context.setPartition(2);
- context.setOffset(2);
transformed = transformer.transform("3", "5").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("5");
// The order is only guaranteed per partition!
context.setPartition(1);
- context.setOffset(3);
transformed = transformer.transform("2", "6").iterator();
assertThat(transformed.hasNext()).isTrue();
assertThat(transformed.next()).isEqualTo("6");