package de.juplo.kafka;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.web.servlet.MockMvc;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static de.juplo.kafka.ApplicationTests.PARTITIONS;
import static de.juplo.kafka.ApplicationTests.TOPIC;
@SpringBootTest(
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
- "juplo.consumer.topic=" + TOPIC })
+ "juplo.consumer.topic=" + TOPIC,
+ "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
+ "spring.kafka.producer.properties[spring.json.type.mapping]=ADD:de.juplo.test.MessageAdd,CALC:de.juplo.test.MessageCalc"
+ })
@AutoConfigureMockMvc
@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
public class ApplicationTests
.andExpect(status().isOk())
.andExpect(jsonPath("status").value("UP")));
}
+
+
+
+ final long[] currentOffsets = new long[PARTITIONS];
+ @Autowired
+ KafkaTemplate<String, ?> kafkaTemplate;
+
+ private void send(String key, byte[] value)
+ {
+ kafkaTemplate
+ .send(TOPIC, key, value)
+ .thenAccept(result ->
+ {
+ RecordMetadata metadata = result.getRecordMetadata();
+ currentOffsets[metadata.partition()] = metadata.offset();
+ });
+ }
+
+ @AfterEach
+ void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+ {
+ adminClient
+ .deleteRecords(recordsToDelete())
+ .all()
+ .get();
+ }
+
+ private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+ {
+ return IntStream
+ .range(0, PARTITIONS)
+ .filter(i -> currentOffsets[i] > 0)
+ .mapToObj(i -> Integer.valueOf(i))
+ .collect(Collectors.toMap(
+ i -> new TopicPartition(TOPIC, i),
+ i -> recordsToDelete(i)));
+ }
+
+ private RecordsToDelete recordsToDelete(int partition)
+ {
+ return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+ }
+
+ @TestConfiguration
+ static class ApplicationTestConfig
+ {
+ @Bean
+ AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+ return AdminClient.create(properties);
+ }
+ }
}