import com.mongodb.client.MongoClient;
import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
@Autowired
TestRecordHandler recordHandler;
@Autowired
+ DeadLetterTopicConsumer deadLetterTopicConsumer;
+ @Autowired
EndlessConsumer endlessConsumer;
KafkaProducer<Bytes, Bytes> testRecordProducer;
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+ await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills());
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
+ await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received")
+ .atMost(Duration.ofSeconds(30))
+ .pollInterval(Duration.ofSeconds(1))
+ .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors());
await("Offsets committed")
.atMost(Duration.ofSeconds(10))
recordHandler.seenOffsets = new HashMap<>();
recordHandler.receivedMessages = 0;
+ deadLetterTopicConsumer.messages.clear();
+
doForCurrentOffsets((tp, offset) ->
{
oldOffsets.put(tp, offset - 1);
{
return factory.createConsumer();
}
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, String> dltContainerFactory(
+ KafkaProperties properties)
+ {
+ Map<String, Object> consumerProperties = new HashMap<>();
+
+ consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
+ consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ DefaultKafkaConsumerFactory dltConsumerFactory =
+ new DefaultKafkaConsumerFactory<>(consumerProperties);
+ ConcurrentKafkaListenerContainerFactory<String, String> factory =
+ new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(dltConsumerFactory);
+ return factory;
+ }
+
+ @Bean
+ public DeadLetterTopicConsumer deadLetterTopicConsumer()
+ {
+ return new DeadLetterTopicConsumer();
+ }
}
}