import org.springframework.kafka.test.context.EmbeddedKafka;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@FieldSource("PARTITIONS")
void testExistingOffset(int partition) throws Exception
{
- SendResult<byte[], byte[]> result = send(partition);
+ List<SendResult<byte[], byte[]>> results = new LinkedList<>();
+ for (int i = 0; i < (partition + 1) * 7; i++)
+ {
+ SendResult<byte[], byte[]> result = send(partition);
+ if (i % (partition + 1) == 0)
+ {
+ results.add(result);
+ }
+ }
+ results.forEach(result -> fetchAndCheck(result));
+ }
+
+ private void fetchAndCheck(SendResult<byte[], byte[]> result)
+ {
RecordMetadata recordMetadata = result.getRecordMetadata();
ResponseEntity<String> response = fetchRecord(recordMetadata);
check(result, response);