import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
}
@DisplayName("Not yet existing offset")
- @Test
- void testNotYetExistingOffset()
+ @ParameterizedTest(name = "partition: {0}")
+ @FieldSource("PARTITIONS")
+ void testNotYetExistingOffset(int partition)
{
- ResponseEntity<String> response = restTemplate.getForEntity("/1/66666666666", String.class);
+ ResponseEntity<String> response = restTemplate.getForEntity("/{partition}/666", String.class, partition);
assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value()));
}
@DisplayName("Already deleted offset")
- @Test
- void testAlreadyDeletedOffset(@Autowired AdminClient adminClient) throws Exception
+ @ParameterizedTest(name = "partition: {0}")
+ @FieldSource("PARTITIONS")
+ void testAlreadyDeletedOffset(int partition) throws Exception
{
- RecordMetadata recordMetadata = send("Hallo", "Welt!");
+ RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition);
deleteAllRecords(adminClient);
ResponseEntity<String> response = restTemplate.getForEntity(
"/{partition}/{offset}",
static final String TOPIC = "ExampleConsumerTest_TEST";
static final int NUM_PARTITIONS = 7;
+ static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray();
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
+ AdminClient adminClient;
+ @Autowired
TestRestTemplate restTemplate;
final long[] currentOffsets = new long[NUM_PARTITIONS];
return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
}
- private RecordMetadata send(String key, String value) throws Exception
+ private RecordMetadata send(int partition, String key, String value) throws Exception
{
return kafkaTemplate
- .send(TOPIC, key, value)
+ .send(TOPIC, partition, key, value)
.thenApply(result ->
{
RecordMetadata metadata = result.getRecordMetadata();