From: Kai Moritz Date: Mon, 7 Apr 2025 19:14:14 +0000 (+0200) Subject: GRÜN: Test für nicht-existente Offsets gegen alle Partitionen X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=27349938258bbbfb20262d9a82cb00e978b82df8;p=demos%2Fkafka%2Ftraining GRÜN: Test für nicht-existente Offsets gegen alle Partitionen --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 600b6bf..d2a2242 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -55,18 +56,20 @@ public class ApplicationTests } @DisplayName("Not yet existing offset") - @Test - void testNotYetExistingOffset() + @ParameterizedTest(name = "partition: {0}") + @FieldSource("PARTITIONS") + void testNotYetExistingOffset(int partition) { - ResponseEntity response = restTemplate.getForEntity("/1/66666666666", String.class); + ResponseEntity response = restTemplate.getForEntity("/{partition}/666", String.class, partition); assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value())); } @DisplayName("Already deleted offset") - @Test - void testAlreadyDeletedOffset(@Autowired AdminClient adminClient) throws Exception + @ParameterizedTest(name = "partition: {0}") + @FieldSource("PARTITIONS") + void testAlreadyDeletedOffset(int partition) throws Exception { - RecordMetadata recordMetadata = send("Hallo", "Welt!"); + RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition); deleteAllRecords(adminClient); ResponseEntity response = restTemplate.getForEntity( "/{partition}/{offset}", @@ -88,10 +91,13 @@ public class ApplicationTests static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 7; + static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray(); @Autowired KafkaTemplate kafkaTemplate; @Autowired + AdminClient adminClient; + @Autowired TestRestTemplate restTemplate; final long[] currentOffsets = new long[NUM_PARTITIONS]; @@ -131,10 +137,10 @@ public class ApplicationTests return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); } - private RecordMetadata send(String key, String value) throws Exception + private RecordMetadata send(int partition, String key, String value) throws Exception { return kafkaTemplate - .send(TOPIC, key, value) + .send(TOPIC, partition, key, value) .thenApply(result -> { RecordMetadata metadata = result.getRecordMetadata();