GRÜN: Test für nicht-existente Offsets gegen alle Partitionen
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 19:14:14 +0000 (21:14 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 600b6bf..d2a2242 100644 (file)
@@ -11,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.FieldSource;
 import org.junit.jupiter.params.provider.ValueSource;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
@@ -55,18 +56,20 @@ public class ApplicationTests
   }
 
   @DisplayName("Not yet existing offset")
-  @Test
-  void testNotYetExistingOffset()
+  @ParameterizedTest(name = "partition: {0}")
+  @FieldSource("PARTITIONS")
+  void testNotYetExistingOffset(int partition)
   {
-    ResponseEntity<String> response = restTemplate.getForEntity("/1/66666666666", String.class);
+    ResponseEntity<String> response = restTemplate.getForEntity("/{partition}/666", String.class, partition);
     assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value()));
   }
 
   @DisplayName("Already deleted offset")
-  @Test
-  void testAlreadyDeletedOffset(@Autowired AdminClient adminClient) throws Exception
+  @ParameterizedTest(name = "partition: {0}")
+  @FieldSource("PARTITIONS")
+  void testAlreadyDeletedOffset(int partition) throws Exception
   {
-    RecordMetadata recordMetadata = send("Hallo", "Welt!");
+    RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition);
     deleteAllRecords(adminClient);
     ResponseEntity<String> response = restTemplate.getForEntity(
       "/{partition}/{offset}",
@@ -88,10 +91,13 @@ public class ApplicationTests
 
   static final String TOPIC = "ExampleConsumerTest_TEST";
   static final int NUM_PARTITIONS = 7;
+  static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray();
 
   @Autowired
   KafkaTemplate<String, String> kafkaTemplate;
   @Autowired
+  AdminClient adminClient;
+  @Autowired
   TestRestTemplate restTemplate;
 
   final long[] currentOffsets = new long[NUM_PARTITIONS];
@@ -131,10 +137,10 @@ public class ApplicationTests
     return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
   }
 
-  private RecordMetadata send(String key, String value) throws Exception
+  private RecordMetadata send(int partition, String key, String value) throws Exception
   {
     return kafkaTemplate
-      .send(TOPIC, key, value)
+      .send(TOPIC, partition, key, value)
       .thenApply(result ->
       {
         RecordMetadata metadata = result.getRecordMetadata();