1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.Stores;
9 import org.junit.jupiter.api.BeforeAll;
10 import org.junit.jupiter.api.DisplayName;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.boot.test.context.SpringBootTest;
14 import org.springframework.boot.test.context.TestConfiguration;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Primary;
17 import org.springframework.kafka.annotation.KafkaListener;
18 import org.springframework.kafka.core.KafkaTemplate;
19 import org.springframework.kafka.support.KafkaHeaders;
20 import org.springframework.kafka.support.SendResult;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.messaging.handler.annotation.Header;
23 import org.springframework.messaging.handler.annotation.Payload;
24 import org.springframework.util.LinkedMultiValueMap;
25 import org.springframework.util.MultiValueMap;
27 import java.time.Duration;
29 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
30 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
31 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
32 import static org.awaitility.Awaitility.await;
37 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
38 "spring.kafka.producer.properties.spring.json.add.type.headers=false",
39 "spring.kafka.consumer.auto-offset-reset=earliest",
40 "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
41 "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
42 "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
43 "logging.level.root=WARN",
44 "logging.level.de.juplo=DEBUG",
45 "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
46 "juplo.wordcount.counter.commit-interval=0",
47 "juplo.wordcount.counter.cacheMaxBytes=0",
48 "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
49 "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
50 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
52 public class CounterApplicationIT
54 public static final String TOPIC_IN = "in";
55 public static final String TOPIC_OUT = "out";
60 CounterStreamProcessor streamProcessor;
64 public static void testSendMessage(
65 @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
73 SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
75 "Sent: {}={}, partition={}, offset={}",
76 result.getProducerRecord().key(),
77 result.getProducerRecord().value(),
78 result.getRecordMetadata().partition(),
79 result.getRecordMetadata().offset());
83 throw new RuntimeException(e);
88 @DisplayName("Await the expected number of messages")
90 public void testAwaitExpectedNumberOfMessagesForUsers()
92 await("Expected number of messages")
93 .atMost(Duration.ofSeconds(5))
94 .untilAsserted(() -> consumer.enforceAssertion(
95 receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
98 @DisplayName("Await the expected output messages")
100 void testSendMessage()
102 await("Expected messages")
103 .atMost(Duration.ofSeconds(10))
104 .untilAsserted(() -> consumer.enforceAssertion(
105 receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
108 @DisplayName("Await the expected final output messages")
110 public void testAwaitExpectedLastMessagesForUsers()
112 await("Expected final output messages")
113 .atMost(Duration.ofSeconds(5))
114 .untilAsserted(() -> consumer.enforceAssertion(
115 receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
118 @DisplayName("Await the expected state in the state-store")
120 public void testAwaitExpectedState()
122 await("Expected state")
123 .atMost(Duration.ofSeconds(5))
124 .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
128 static class Consumer
130 private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
132 @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
133 public synchronized void receive(
134 @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
135 @Payload TestOutputWordCounter counter)
137 log.debug("Received message: {} -> {}", word, counter);
138 received.add(word, counter);
141 synchronized void enforceAssertion(
142 java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
144 assertion.accept(received);
149 static class Configuration
154 return new Consumer();
159 KeyValueBytesStoreSupplier inMemoryStoreSupplier()
161 return Stores.inMemoryKeyValueStore(STORE_NAME);