From 1c5be32878793e9a61c77d54ba51ce60ca5e6a41 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Apr 2025 23:26:07 +0200 Subject: [PATCH] =?utf8?q?GR=C3=9CN:=20Existierende=20Offsets=20sind=20nic?= =?utf8?q?ht=20die=20einzigen?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 76e5278..a4dc033 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -27,6 +27,7 @@ import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -66,8 +67,21 @@ public class ApplicationTests @FieldSource("PARTITIONS") void testExistingOffset(int partition) throws Exception { - SendResult result = send(partition); + List> results = new LinkedList<>(); + for (int i = 0; i < (partition + 1) * 7; i++) + { + SendResult result = send(partition); + if (i % (partition + 1) == 0) + { + results.add(result); + } + } + results.forEach(result -> fetchAndCheck(result)); + } + + private void fetchAndCheck(SendResult result) + { RecordMetadata recordMetadata = result.getRecordMetadata(); ResponseEntity response = fetchRecord(recordMetadata); check(result, response); -- 2.20.1