public ApplicationTest()
{
super(
- new StringSerializer(),
- new RecordGenerator<> ()
+ new RecordGenerator()
{
final StringSerializer stringSerializer = new StringSerializer();
final LongSerializer longSerializer = new LongSerializer();
public void generate(
int numberOfMessagesToGenerate,
Set<Integer> poisonPills,
- Consumer<ProducerRecord<String, Bytes>> messageSender)
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
{
int i = 0;
? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
: new Bytes(longSerializer.serialize(TOPIC, (long)i));
- ProducerRecord<String, Bytes> record =
+ ProducerRecord<Bytes, Bytes> record =
new ProducerRecord<>(
TOPIC,
partition,
- Integer.toString(partition*10+key%2),
+ new Bytes(stringSerializer.serialize(TOPIC,Integer.toString(partition*10+key%2))),
value);
messageSender.accept(record);
@Autowired
ExecutorService executor;
- KafkaProducer<K, Bytes> testRecordProducer;
+ KafkaProducer<Bytes, Bytes> testRecordProducer;
KafkaConsumer<Bytes, Bytes> offsetConsumer;
Consumer<ConsumerRecord<K, V>> testHandler;
EndlessConsumer<K, V> endlessConsumer;
Set<ConsumerRecord<K, V>> receivedRecords;
- final Serializer<K> keySerializer;
- final RecordGenerator<K> recordGenerator;
- final Consumer<ProducerRecord<K, Bytes>> messageSender;
+ final RecordGenerator recordGenerator;
+ final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
- public GenericApplicationTest(
- Serializer<K> keySerializer,
- RecordGenerator<K> recordGenerator)
+ public GenericApplicationTest(RecordGenerator recordGenerator)
{
- this.keySerializer = keySerializer;
this.recordGenerator = recordGenerator;
this.messageSender = (record) -> sendMessage(record);
}
}
- public interface RecordGenerator<K>
+ public interface RecordGenerator
{
void generate(
int numberOfMessagesToGenerate,
Set<Integer> poistionPills,
- Consumer<ProducerRecord<K, Bytes>> messageSender);
+ Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
}
- void sendMessage(ProducerRecord<K, Bytes> record)
+ void sendMessage(ProducerRecord<Bytes, Bytes> record)
{
testRecordProducer.send(record, (metadata, e) ->
{
props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", 100);
- props.put("key.serializer", keySerializer.getClass().getName());
+ props.put("key.serializer", BytesSerializer.class.getName());
props.put("value.serializer", BytesSerializer.class.getName());
testRecordProducer = new KafkaProducer<>(props);