final LongSerializer serializer = new LongSerializer();
final long[] currentOffsets = new long[NUM_PARTITIONS];
+ long nextMessage = 1;
+
final MockRecordHandler mockRecordHandler = new MockRecordHandler();
final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
.all()
.get();
mockRecordHandler.clear();
+ nextMessage = 1;
isTerminatedExceptionally.set(false);
}
private void sendValidMessage(int partition)
{
- send(partition, partition);
+ send(partition, nextMessage);
}
private void sendNonDeserializableMessage(int partition)
private void send(int partition, byte[] bytes)
{
+ nextMessage++;
kafkaTemplate
.send(TOPIC, partition, "EGAL", bytes)
.thenAccept(result ->