GRÜN: Verhalten beim Abrufen eines gelöschten Offsets definiert
authorKai Moritz <kai@juplo.de>
Mon, 7 Apr 2025 17:27:50 +0000 (19:27 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 21 May 2025 18:14:13 +0000 (20:14 +0200)
src/test/java/de/juplo/kafka/ApplicationTests.java

index ba6c4af..0f78ea7 100644 (file)
@@ -1,16 +1,33 @@
 package de.juplo.kafka;
 
 import com.jayway.jsonpath.JsonPath;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
 import org.springframework.boot.test.web.client.TestRestTemplate;
+import org.springframework.context.annotation.Bean;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.HttpStatusCode;
 import org.springframework.http.ResponseEntity;
+import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.test.context.EmbeddedKafka;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
 import static de.juplo.kafka.ApplicationTests.NUM_PARTITIONS;
 import static de.juplo.kafka.ApplicationTests.TOPIC;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -43,10 +60,89 @@ public class ApplicationTests
     assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value()));
   }
 
+  @DisplayName("Already deleted offset")
+  @Test
+  void testAlreadyDeletedOffset(@Autowired AdminClient adminClient) throws Exception
+  {
+    RecordMetadata recordMetadata = send("Hallo", "Welt!");
+    deleteAllRecords(adminClient);
+    ResponseEntity<String> response = restTemplate.getForEntity(
+      "/{partition}/{offset}",
+      String.class,
+      recordMetadata.partition(),
+      recordMetadata.offset());
+    assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value()));
+  }
+
 
   static final String TOPIC = "ExampleConsumerTest_TEST";
   static final int NUM_PARTITIONS = 7;
 
+  @Autowired
+  KafkaTemplate<String, String> kafkaTemplate;
   @Autowired
   TestRestTemplate restTemplate;
+
+  final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+
+  @BeforeEach
+  void resetCurrentOffsets()
+  {
+    for (int i = 0; i < NUM_PARTITIONS; i++)
+    {
+      currentOffsets[i] = -1;
+    }
+  }
+
+  @AfterEach
+  void deleteAllRecords(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+  {
+    adminClient
+      .deleteRecords(recordsToDelete())
+      .all()
+      .get();
+  }
+
+  private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+  {
+    return IntStream
+      .range(0, NUM_PARTITIONS)
+      .filter(i -> currentOffsets[i] > -1)
+      .mapToObj(i -> Integer.valueOf(i))
+      .collect(Collectors.toMap(
+        i -> new TopicPartition(TOPIC, i),
+        i -> recordsToDelete(i)));
+  }
+
+  private RecordsToDelete recordsToDelete(int partition)
+  {
+    return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+  }
+
+  private RecordMetadata send(String key, String value) throws Exception
+  {
+    return kafkaTemplate
+      .send(TOPIC, key, value)
+      .thenApply(result ->
+      {
+        RecordMetadata metadata = result.getRecordMetadata();
+        currentOffsets[metadata.partition()] = metadata.offset();
+        return result.getRecordMetadata();
+      })
+      .get();
+  }
+
+
+  @TestConfiguration
+  static class ConsumerRunnableTestConfig
+  {
+    @Bean
+    AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+      return AdminClient.create(properties);
+    }
+  }
 }