]> juplo.de Git - demos/kafka/training/commitdiff
GRÜN: Existierende Offsets sind nicht die einzigen
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 21:26:07 +0000 (23:26 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:47:38 +0000 (21:47 +0100)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 5e0c3a71a758a28af5195425794188f3805023e3..b64f797944742d9ab2a135a36cfb744fd645d5a8 100644 (file)
@@ -29,6 +29,7 @@ import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.annotation.DirtiesContext;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -70,8 +71,21 @@ public class ApplicationTests
   @FieldSource("PARTITIONS")
   void testExistingOffset(int partition) throws Exception
   {
-    SendResult<byte[], byte[]> result = send(partition);
+    List<SendResult<byte[], byte[]>> results = new LinkedList<>();
+    for (int i = 0; i < (partition + 1) * 7; i++)
+    {
+      SendResult<byte[], byte[]> result = send(partition);
+      if (i % (partition + 1) == 0)
+      {
+        results.add(result);
+      }
+    }
 
+    results.forEach(result -> fetchAndCheck(result));
+  }
+
+  private void fetchAndCheck(SendResult<byte[], byte[]> result)
+  {
     RecordMetadata recordMetadata = result.getRecordMetadata();
     ResponseEntity<String> response = fetchRecord(recordMetadata);
     check(result, response);