WIP
authorKai Moritz <kai@juplo.de>
Tue, 28 Jan 2025 20:33:08 +0000 (21:33 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 28 Jan 2025 20:33:08 +0000 (21:33 +0100)
src/test/java/de/juplo/kafka/ApplicationTests.java

index 092660b..f4042aa 100644 (file)
@@ -1,13 +1,28 @@
 package de.juplo.kafka;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 import org.springframework.test.web.servlet.MockMvc;
 
 import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
@@ -20,7 +35,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 @SpringBootTest(
     properties = {
         "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
-        "juplo.consumer.topic=" + TOPIC })
+        "juplo.consumer.topic=" + TOPIC,
+        "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+        "spring.kafka.producer.properties[spring.json.type.mapping]=ADD:de.juplo.test.MessageAdd,CALC:de.juplo.test.MessageCalc"
+    })
 @AutoConfigureMockMvc
 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
 public class ApplicationTests
@@ -43,4 +61,58 @@ public class ApplicationTests
         .andExpect(status().isOk())
         .andExpect(jsonPath("status").value("UP")));
   }
+
+
+
+  final long[] currentOffsets = new long[PARTITIONS];
+  @Autowired
+  KafkaTemplate<String, ?> kafkaTemplate;
+
+  private void send(String key, byte[] value)
+  {
+    kafkaTemplate
+      .send(TOPIC, key, value)
+      .thenAccept(result ->
+      {
+        RecordMetadata metadata = result.getRecordMetadata();
+        currentOffsets[metadata.partition()] = metadata.offset();
+      });
+  }
+
+  @AfterEach
+  void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+  {
+    adminClient
+      .deleteRecords(recordsToDelete())
+      .all()
+      .get();
+  }
+
+  private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+  {
+    return IntStream
+      .range(0, PARTITIONS)
+      .filter(i -> currentOffsets[i] > 0)
+      .mapToObj(i -> Integer.valueOf(i))
+      .collect(Collectors.toMap(
+        i -> new TopicPartition(TOPIC, i),
+        i -> recordsToDelete(i)));
+  }
+
+  private RecordsToDelete recordsToDelete(int partition)
+  {
+    return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+  }
+
+  @TestConfiguration
+  static class ApplicationTestConfig
+  {
+    @Bean
+    AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+      return AdminClient.create(properties);
+    }
+  }
 }