Refacotr: Erzeugung der Testnachrichten überarbeitet
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 20:59:08 +0000 (22:59 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 4c7c8b3..f356ba1 100644 (file)
@@ -23,6 +23,7 @@ import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatusCode;
 import org.springframework.http.ResponseEntity;
 import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
 import java.util.HashMap;
@@ -42,6 +43,8 @@ import static org.assertj.core.api.Assertions.assertThat;
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
     "juplo.consumer.topic=" + TOPIC,
+    "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+    "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
   })
 @EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
@@ -63,14 +66,18 @@ public class ApplicationTests
   @FieldSource("PARTITIONS")
   void testExistingOffset(int partition) throws Exception
   {
-    String key = Integer.toString(partition);
-    String value = "Hallo Welt! -- " + partition;
-    RecordMetadata recordMetadata = send(partition, key, value);
+    SendResult<byte[], byte[]> result = send(partition);
+
+    RecordMetadata recordMetadata = result.getRecordMetadata();
     ResponseEntity<String> response = restTemplate.getForEntity(
       "/{partition}/{offset}",
       String.class,
       recordMetadata.partition(),
       recordMetadata.offset());
+
+    String key = new String(result.getProducerRecord().key());
+    String value = new String(result.getProducerRecord().value());
+
     assertThat(response.getStatusCode())
       .isEqualTo(HttpStatusCode.valueOf(HttpStatus.OK.value()));
     assertThat(response.getHeaders())
@@ -100,7 +107,7 @@ public class ApplicationTests
   @FieldSource("PARTITIONS")
   void testAlreadyDeletedOffset(int partition) throws Exception
   {
-    RecordMetadata recordMetadata = send(partition, Integer.toString(partition), "Hallo Welt! -- " + partition);
+    RecordMetadata recordMetadata = send(partition).getRecordMetadata();
     deleteAllRecords(adminClient);
     ResponseEntity<String> response = restTemplate.getForEntity(
       "/{partition}/{offset}",
@@ -127,7 +134,7 @@ public class ApplicationTests
   static final int[] PARTITIONS = IntStream.range(0, NUM_PARTITIONS).toArray();
 
   @Autowired
-  KafkaTemplate<String, String> kafkaTemplate;
+  KafkaTemplate<byte[], byte[]> kafkaTemplate;
   @Autowired
   AdminClient adminClient;
   @Autowired
@@ -137,6 +144,8 @@ public class ApplicationTests
 
   final long[] currentOffsets = new long[NUM_PARTITIONS];
 
+  int message = 0;
+
 
   @BeforeEach
   void resetCurrentOffsets()
@@ -172,15 +181,17 @@ public class ApplicationTests
     return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
   }
 
-  private RecordMetadata send(int partition, String key, String value) throws Exception
+  private SendResult<byte[], byte[]> send(int partition) throws Exception
   {
+    String key = Integer.toString(partition);
+    String value = "Hällö Wöhrld!%? -- " + ++message;
     return kafkaTemplate
-      .send(TOPIC, partition, key, value)
+      .send(TOPIC, partition, key.getBytes(), value.getBytes())
       .thenApply(result ->
       {
         RecordMetadata metadata = result.getRecordMetadata();
         currentOffsets[metadata.partition()] = metadata.offset();
-        return result.getRecordMetadata();
+        return result;
       })
       .get();
   }