From e8a1c6d9d553b336d569d99f14edbfc75c8e3eaa Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 7 Apr 2025 22:59:08 +0200 Subject: [PATCH] =?utf8?q?Refacotr:=20Erzeugung=20der=20Testnachrichten=20?= =?utf8?q?=C3=BCberarbeitet?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ApplicationTests.java | 27 +++++++++++++------ 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 4c7c8b3..f356ba1 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -23,6 +23,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatusCode; import org.springframework.http.ResponseEntity; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import java.util.HashMap; @@ -42,6 +43,8 @@ import static org.assertj.core.api.Assertions.assertThat; properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", "juplo.consumer.topic=" + TOPIC, + "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", "logging.level.de.juplo.kafka=TRACE", }) @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) @@ -63,14 +66,18 @@ public class ApplicationTests @FieldSource("PARTITIONS") void testExistingOffset(int partition) throws Exception { - String key = Integer.toString(partition); - String value = "Hallo Welt! -- " + partition; - RecordMetadata recordMetadata = send(partition, key, value); + SendResult result = send(partition); + + RecordMetadata recordMetadata = result.getRecordMetadata(); ResponseEntity response = restTemplate.getForEntity( "/{partition}/{offset}", String.class, recordMetadata.partition(), recordMetadata.offset()); + + String key = new String(result.getProducerRecord().key()); + String value = new String(result.getProducerRecord().value()); + assertThat(response.getStatusCode()) .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value())); assertThat(response.getHeaders()) @@ -100,7 +107,7 @@ public class ApplicationTests @FieldSource("PARTITIONS") void testAlreadyDeletedOffset(int partition) throws Exception { - RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition); + RecordMetadata recordMetadata = send(partition).getRecordMetadata(); deleteAllRecords(adminClient); ResponseEntity response = restTemplate.getForEntity( "/{partition}/{offset}", @@ -127,7 +134,7 @@ public class ApplicationTests static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray(); @Autowired - KafkaTemplate kafkaTemplate; + KafkaTemplate kafkaTemplate; @Autowired AdminClient adminClient; @Autowired @@ -137,6 +144,8 @@ public class ApplicationTests final long[] currentOffsets = new long[NUM_PARTITIONS]; + int message = 0; + @BeforeEach void resetCurrentOffsets() @@ -172,15 +181,17 @@ public class ApplicationTests return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); } - private RecordMetadata send(int partition, String key, String value) throws Exception + private SendResult send(int partition) throws Exception { + String key = Integer.toString(partition); + String value = "Hällö Wöhrld!%? -- " + ++message; return kafkaTemplate - .send(TOPIC, partition, key, value) + .send(TOPIC, partition, key.getBytes(), value.getBytes()) .thenApply(result -> { RecordMetadata metadata = result.getRecordMetadata(); currentOffsets[metadata.partition()] = metadata.offset(); - return result.getRecordMetadata(); + return result; }) .get(); } -- 2.20.1