From f2c1f0582f32d05d714478e412660eb2dc34cb8f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 28 Jan 2025 21:33:08 +0100 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/ApplicationTests.java | 74 ++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 092660b4..f4042aaf 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -1,13 +1,28 @@ package de.juplo.kafka; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.web.servlet.MockMvc; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static de.juplo.kafka.ApplicationTests.PARTITIONS; import static de.juplo.kafka.ApplicationTests.TOPIC; @@ -20,7 +35,10 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. @SpringBootTest( properties = { "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.consumer.topic=" + TOPIC }) + "juplo.consumer.topic=" + TOPIC, + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties[spring.json.type.mapping]=ADD:de.juplo.test.MessageAdd,CALC:de.juplo.test.MessageCalc" + }) @AutoConfigureMockMvc @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS) public class ApplicationTests @@ -43,4 +61,58 @@ public class ApplicationTests .andExpect(status().isOk()) .andExpect(jsonPath("status").value("UP"))); } + + + + final long[] currentOffsets = new long[PARTITIONS]; + @Autowired + KafkaTemplate kafkaTemplate; + + private void send(String key, byte[] value) + { + kafkaTemplate + .send(TOPIC, key, value) + .thenAccept(result -> + { + RecordMetadata metadata = result.getRecordMetadata(); + currentOffsets[metadata.partition()] = metadata.offset(); + }); + } + + @AfterEach + void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException + { + adminClient + .deleteRecords(recordsToDelete()) + .all() + .get(); + } + + private Map recordsToDelete() + { + return IntStream + .range(0, PARTITIONS) + .filter(i -> currentOffsets[i] > 0) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toMap( + i -> new TopicPartition(TOPIC, i), + i -> recordsToDelete(i))); + } + + private RecordsToDelete recordsToDelete(int partition) + { + return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); + } + + @TestConfiguration + static class ApplicationTestConfig + { + @Bean + AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + } } -- 2.20.1