LongSerializer serializer = new LongSerializer();
long[] currentOffsets = new long[] { 0, 0 };
+ long nextMessage = 1;
@Autowired
AdminClient adminClient;
adminClient.deleteRecords(Map.of(
new TopicPartition(TOPIC, 0), deleteAllRecordsByPartition(0),
new TopicPartition(TOPIC, 1), deleteAllRecordsByPartition(1)));
+ nextMessage = 1;
mockHandler.clear();
isTerminatedExceptionally.set(false);
}
private void sendValidMessage(int partition)
{
- send(partition, partition);
+ send(partition, nextMessage);
}
private void sendNonDeserializableMessage(int partition)
private void send(int partition, byte[] bytes)
{
+ nextMessage++;
kafkaTemplate.send(TOPIC, partition, "EGAL", bytes);
}