]> juplo.de Git - demos/kafka/training/commitdiff
Refacotr: Erzeugung der Testnachrichten überarbeitet
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:59:08 +0000 (22:59 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 22 Mar 2026 20:47:38 +0000 (21:47 +0100)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 5af61b6dc1e214a31ae4b58b969f5c081f6502d7..930717c91c4d1fe738ad7bd0262d5bfe414e11a8 100644 (file)
@@ -24,6 +24,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatusCode;
 import org.springframework.http.ResponseEntity;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.annotation.DirtiesContext;
 
@@ -44,6 +45,8 @@ import static org.assertj.core.api.Assertions.assertThat;
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
     "juplo.consumer.topic=" + TOPIC,
+    "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+    "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
   })
 @DirtiesContext
@@ -67,14 +70,18 @@ public class ApplicationTests
   @FieldSource("PARTITIONS")
   void testExistingOffset(int partition) throws Exception
   {
-    String key = Integer.toString(partition);
-    String value = "Hallo Welt! -- " + partition;
-    RecordMetadata recordMetadata = send(partition, key, value);
+    SendResult<byte[], byte[]> result = send(partition);
+
+    RecordMetadata recordMetadata = result.getRecordMetadata();
     ResponseEntity<String> response = restTemplate.getForEntity(
       "/{partition}/{offset}",
       String.class,
       recordMetadata.partition(),
       recordMetadata.offset());
+
+    String key = new String(result.getProducerRecord().key());
+    String value = new String(result.getProducerRecord().value());
+
     assertThat(response.getStatusCode())
       .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
     assertThat(response.getHeaders().toSingleValueMap())
@@ -104,7 +111,7 @@ public class ApplicationTests
   @FieldSource("PARTITIONS")
   void testAlreadyDeletedOffset(int partition) throws Exception
   {
-    RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition);
+    RecordMetadata recordMetadata = send(partition).getRecordMetadata();
     deleteAllRecords(adminClient);
     ResponseEntity<String> response = restTemplate.getForEntity(
       "/{partition}/{offset}",
@@ -131,7 +138,7 @@ public class ApplicationTests
   static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray();
 
   @Autowired
-  KafkaTemplate<String, String> kafkaTemplate;
+  KafkaTemplate<byte[], byte[]> kafkaTemplate;
   @Autowired
   AdminClient adminClient;
   @Autowired
@@ -141,6 +148,8 @@ public class ApplicationTests
 
   final long[] currentOffsets = new long[NUM_PARTITIONS];
 
+  int message = 0;
+
 
   @BeforeEach
   void resetCurrentOffsets()
@@ -176,15 +185,17 @@ public class ApplicationTests
     return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
   }
 
-  private RecordMetadata send(int partition, String key, String value) throws Exception
+  private SendResult<byte[], byte[]> send(int partition) throws Exception
   {
+    String key = Integer.toString(partition);
+    String value = "Hällö Wöhrld!%? -- " + ++message;
     return kafkaTemplate
-      .send(TOPIC, partition, key, value)
+      .send(TOPIC, partition, key.getBytes(), value.getBytes())
       .thenApply(result ->
       {
         RecordMetadata metadata = result.getRecordMetadata();
         currentOffsets[metadata.partition()] = metadata.offset();
-        return result.getRecordMetadata();
+        return result;
       })
       .get();
   }