From: Kai Moritz Date: Mon, 7 Apr 2025 17:27:50 +0000 (+0200) Subject: GRÜN: Verhalten beim Abrufen eines gelöschten Offsets definiert X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e182c40b3076e1cd59a30608e8ca72c61fc1ca4a;p=demos%2Fkafka%2Ftraining GRÜN: Verhalten beim Abrufen eines gelöschten Offsets definiert --- diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ba6c4af..0f78ea7 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,16 +1,33 @@ package de.juplo.kafka; import com.jayway.jsonpath.JsonPath; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; import org.springframework.boot.test.web.client.TestRestTemplate; +import org.springframework.context.annotation.Bean; import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatusCode; import org.springframework.http.ResponseEntity; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import static de.juplo.kafka.ApplicationTests.NUM_PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; import static org.assertj.core.api.Assertions.assertThat; @@ -43,10 +60,89 @@ public class ApplicationTests assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value())); } + @DisplayName("Already deleted offset") + @Test + void testAlreadyDeletedOffset(@Autowired AdminClient adminClient) throws Exception + { + RecordMetadata recordMetadata = send("Hallo", "Welt!"); + deleteAllRecords(adminClient); + ResponseEntity response = restTemplate.getForEntity( + "/{partition}/{offset}", + String.class, + recordMetadata.partition(), + recordMetadata.offset()); + assertThat(response.getStatusCode()).isEqualTo(HttpStatusCode.valueOf(HttpStatus.NOT_FOUND.value())); + } + static final String TOPIC = "ExampleConsumerTest_TEST"; static final int NUM_PARTITIONS = 7; + @Autowired + KafkaTemplate kafkaTemplate; @Autowired TestRestTemplate restTemplate; + + final long[] currentOffsets = new long[NUM_PARTITIONS]; + + + @BeforeEach + void resetCurrentOffsets() + { + for (int i = 0; i < NUM_PARTITIONS; i++) + { + currentOffsets[i] = -1; + } + } + + @AfterEach + void deleteAllRecords(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException + { + adminClient + .deleteRecords(recordsToDelete()) + .all() + .get(); + } + + private Map recordsToDelete() + { + return IntStream + .range(0, NUM_PARTITIONS) + .filter(i -> currentOffsets[i] > -1) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toMap( + i -> new TopicPartition(TOPIC, i), + i -> recordsToDelete(i))); + } + + private RecordsToDelete recordsToDelete(int partition) + { + return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); + } + + private RecordMetadata send(String key, String value) throws Exception + { + return kafkaTemplate + .send(TOPIC, key, value) + .thenApply(result -> + { + RecordMetadata metadata = result.getRecordMetadata(); + currentOffsets[metadata.partition()] = metadata.offset(); + return result.getRecordMetadata(); + }) + .get(); + } + + + @TestConfiguration + static class ConsumerRunnableTestConfig + { + @Bean + AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + } }