X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationIT.java;h=0faa2de575731a96fea892f81a75c2a363605a3e;hb=refs%2Fheads%2Fcounter;hp=78d103c282fbe7c4857220d6ca581cdc89264189;hpb=e94a327bebf468e2bcb5b686346a18a1409ec254;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 78d103c..ab395fd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -1,157 +1,165 @@ package de.juplo.kafka.wordcount.counter; -import lombok.RequiredArgsConstructor; +import de.juplo.kafka.wordcount.splitter.TestInputUser; +import de.juplo.kafka.wordcount.splitter.TestInputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWord; +import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.core.*; -import org.springframework.kafka.support.serializer.JsonDeserializer; -import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import java.time.Duration; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*; -import static de.juplo.kafka.wordcount.counter.TestData.convertToMap; -import static de.juplo.kafka.wordcount.counter.TestData.parseHeader; + +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN; +import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; -import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.*; -import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME; @SpringBootTest( properties = { - "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", + "spring.main.allow-bean-definition-overriding=true", + "spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", + "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.counter.commit-interval=0", - "juplo.wordcount.counter.cacheMaxBytes=0", + "juplo.wordcount.counter.commit-interval=100", + "juplo.wordcount.counter.cache-max-bytes=0", "juplo.wordcount.counter.input-topic=" + TOPIC_IN, "juplo.wordcount.counter.output-topic=" + TOPIC_OUT }) -@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS) +@EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }) @Slf4j public class CounterApplicationIT { - public final static String TOPIC_IN = "in"; - public final static String TOPIC_OUT = "out"; - static final int PARTITIONS = 2; + public static final String TOPIC_IN = "in"; + public static final String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; + @Autowired + CounterStreamProcessor streamProcessor; - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages))); + } + @DisplayName("Await the expected output messages") @Test void testSendMessage() { - TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value)); - - await("Expexted converted data") + await("Expected messages") .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages))); + } + + @DisplayName("Await the expected state in the state-store") + @Test + public void testAwaitExpectedState() + { + await("Expected state") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); } - @RequiredArgsConstructor static class Consumer { - private final List> received = new LinkedList<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive(ConsumerRecord record) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word, + @Payload TestOutputWordCounter counter) { - log.debug( - "Received message: {} -> {}, key: {}, value: {}", - record.key(), - record.value(), - parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME), - parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME)); - received.add(KeyValue.pair(record.key(),record.value())); + log.debug("Received message: {} -> {}", word, counter); + received.add(word, counter); } - synchronized List> getReceivedMessages() + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) { - return received; + assertion.accept(received); } } @TestConfiguration static class Configuration { - @Bean - ProducerFactory producerFactory(Properties streamProcessorProperties) - { - Map propertyMap = convertToMap(streamProcessorProperties); - - propertyMap.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - JsonSerializer.class.getName()); - propertyMap.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - JsonSerializer.class.getName()); - - return new DefaultKafkaProducerFactory<>(propertyMap); - } - - @Bean - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - Properties streamProcessorProperties) - { - Map propertyMap = convertToMap(streamProcessorProperties); - - propertyMap.put( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - JsonDeserializer.class.getName()); - propertyMap.put( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - JsonDeserializer.class.getName()); - - ConsumerFactory consumerFactory = - new DefaultKafkaConsumerFactory<>(propertyMap); - - ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory = - new ConcurrentKafkaListenerContainerFactory<>(); - - kafkaListenerContainerFactory.setConsumerFactory(consumerFactory); - - return kafkaListenerContainerFactory; - } - @Bean Consumer consumer() { return new Consumer(); } - @Primary @Bean - KeyValueBytesStoreSupplier inMemoryStoreSupplier() + KeyValueBytesStoreSupplier storeSupplier() { - return Stores.inMemoryKeyValueStore("TEST-STORE"); + return Stores.inMemoryKeyValueStore(STORE_NAME); } } }