1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.Stores;
9 import org.junit.jupiter.api.BeforeAll;
10 import org.junit.jupiter.api.DisplayName;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.boot.test.context.SpringBootTest;
14 import org.springframework.boot.test.context.TestConfiguration;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Primary;
17 import org.springframework.kafka.annotation.KafkaListener;
18 import org.springframework.kafka.core.KafkaTemplate;
19 import org.springframework.kafka.support.KafkaHeaders;
20 import org.springframework.kafka.support.SendResult;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.messaging.handler.annotation.Header;
23 import org.springframework.messaging.handler.annotation.Payload;
24 import org.springframework.util.LinkedMultiValueMap;
25 import org.springframework.util.MultiValueMap;
27 import java.time.Duration;
29 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
30 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
31 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
32 import static org.awaitility.Awaitility.await;
37 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
38 "spring.kafka.producer.properties.spring.json.add.type.headers=false",
39 "spring.kafka.consumer.auto-offset-reset=earliest",
40 "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
41 "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
42 "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
43 "logging.level.root=WARN",
44 "logging.level.de.juplo=DEBUG",
45 "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
46 "juplo.wordcount.counter.commit-interval=0",
47 "juplo.wordcount.counter.cacheMaxBytes=0",
48 "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
49 "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
50 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
52 public class CounterApplicationIT
54 public static final String TOPIC_IN = "in";
55 public static final String TOPIC_OUT = "out";
60 CounterStreamProcessor streamProcessor;
64 public static void testSendMessage(
65 @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
73 SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
75 "Sent: {}={}, partition={}, offset={}",
76 result.getProducerRecord().key(),
77 result.getProducerRecord().value(),
78 result.getRecordMetadata().partition(),
79 result.getRecordMetadata().offset());
83 throw new RuntimeException(e);
88 @DisplayName("Await the expected number of messages")
90 public void testAwaitExpectedNumberOfMessagesForUsers()
92 await("Expected number of messages")
93 .atMost(Duration.ofSeconds(5))
94 .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion()));
97 @DisplayName("Await the expected output messages")
99 void testSendMessage()
101 await("Expected messages")
102 .atMost(Duration.ofSeconds(10))
103 .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
106 @DisplayName("Await the expected final output messages")
108 public void testAwaitExpectedLastMessagesForUsers()
110 await("Expected final output messages")
111 .atMost(Duration.ofSeconds(5))
112 .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
115 @DisplayName("Await the expected state in the state-store")
117 public void testAwaitExpectedState()
119 await("Expected state")
120 .atMost(Duration.ofSeconds(5))
121 .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
125 static class Consumer
127 private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
129 @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
130 public synchronized void receive(
131 @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
132 @Payload TestOutputWordCounter counter)
134 log.debug("Received message: {} -> {}", word, counter);
135 received.add(word, counter);
138 synchronized void enforceAssertion(
139 java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
141 assertion.accept(received);
146 static class Configuration
151 return new Consumer();
156 KeyValueBytesStoreSupplier inMemoryStoreSupplier()
158 return Stores.inMemoryKeyValueStore(STORE_NAME);