- send100Messages(counter ->
- counter == 77
- ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
- : new Bytes(longSerializer.serialize(TOPIC, counter)));
-
- Set<ConsumerRecord<String, Long>> received = new HashSet<>();
- testHandler = record -> received.add(record);
-
- endlessConsumer.start();
+ send100Messages((partition, key, counter) ->
+ {
+ Bytes value = counter == 77
+ ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
+ : serialize(key, counter);
+ return new ProducerRecord<>(TOPIC, partition, key, value);
+ });