From 27349938258bbbfb20262d9a82cb00e978b82df8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Apr 2025 21:14:14 +0200 Subject: [PATCH] =?utf8?q?GR=C3=9CN:=20Test=20f=C3=BCr=20nicht-existente?= =?utf8?q?=20Offsets=20gegen=20alle=20Partitionen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 600b6bf..d2a2242 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.FieldSource; import org.junit.jupiter.params.provider.ValueSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -55,18 +56,20 @@ public class ApplicationTests } @DisplayName("Not yet existing offset") - @Test - void testNotYetExistingOffset() + @ParameterizedTest(name = "partition: {0}") + @FieldSource("PARTITIONS") + void testNotYetExistingOffset(int partition) { - ResponseEntity response = restTemplate.getForEntity("/1/66666666666", String.class); + ResponseEntity response = restTemplate.getForEntity("/{partition}/666", String.class, partition); assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value())); } @DisplayName("Already deleted offset") - @Test - void testAlreadyDeletedOffset(@Autowired AdminClient adminClient) throws Exception + @ParameterizedTest(name = "partition: {0}") + @FieldSource("PARTITIONS") + void testAlreadyDeletedOffset(int partition) throws Exception { - RecordMetadata recordMetadata = send("Hallo", "Welt!"); + RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition); deleteAllRecords(adminClient); ResponseEntity response = restTemplate.getForEntity( "/{partition}/{offset}", @@ -88,10 +91,13 @@ public class ApplicationTests static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 7; + static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray(); @Autowired KafkaTemplate kafkaTemplate; @Autowired + AdminClient adminClient; + @Autowired TestRestTemplate restTemplate; final long[] currentOffsets = new long[NUM_PARTITIONS]; @@ -131,10 +137,10 @@ public class ApplicationTests return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); } - private RecordMetadata send(String key, String value) throws Exception + private RecordMetadata send(int partition, String key, String value) throws Exception { return kafkaTemplate - .send(TOPIC, key, value) + .send(TOPIC, partition, key, value) .thenApply(result -> { RecordMetadata metadata = result.getRecordMetadata(); -- 2.20.1