import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.springframework.kafka.test.context.EmbeddedKafka;
import java.time.Duration;
-import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@SpringBootTest(
classes = {
KafkaAutoConfiguration.class,
+ ApplicationProperties.class,
ExampleConsumerTest.ConsumerRunnableTestConfig.class,
},
properties = {
+ "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
})
@BeforeEach
- void createExampleConsumer(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+ void createExampleConsumer(@Autowired ApplicationProperties properties)
{
- Properties props = new Properties();
- props.put("bootstrap.servers", kafkaBroker);
- props.put("client.id", ID);
- props.put("group.id", ID);
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", LongDeserializer.class.getName());
-
- Consumer<String, Long> consumer = new KafkaConsumer<>(props);
+ ApplicationConfiguration configuration = new ApplicationConfiguration();
+ Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
exampleConsumer = new ExampleConsumer(
ID,