1 package de.juplo.kafka.wordcount.counter;
3 import de.juplo.kafka.wordcount.splitter.TestInputWord;
4 import de.juplo.kafka.wordcount.top10.TestOutputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
8 import org.apache.kafka.streams.state.Stores;
9 import org.junit.jupiter.api.BeforeAll;
10 import org.junit.jupiter.api.DisplayName;
11 import org.junit.jupiter.api.Test;
12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.boot.test.context.SpringBootTest;
14 import org.springframework.boot.test.context.TestConfiguration;
15 import org.springframework.context.annotation.Bean;
16 import org.springframework.context.annotation.Primary;
17 import org.springframework.kafka.annotation.KafkaListener;
18 import org.springframework.kafka.core.KafkaTemplate;
19 import org.springframework.kafka.support.KafkaHeaders;
20 import org.springframework.kafka.support.SendResult;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.messaging.handler.annotation.Header;
23 import org.springframework.messaging.handler.annotation.Payload;
24 import org.springframework.util.LinkedMultiValueMap;
25 import org.springframework.util.MultiValueMap;
27 import java.time.Duration;
29 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
30 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
31 import static org.awaitility.Awaitility.await;
36 "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
37 "spring.kafka.producer.properties.spring.json.add.type.headers=false",
38 "spring.kafka.consumer.auto-offset-reset=earliest",
39 "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
40 "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
41 "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
42 "logging.level.root=WARN",
43 "logging.level.de.juplo=DEBUG",
44 "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
45 "juplo.wordcount.counter.commit-interval=0",
46 "juplo.wordcount.counter.cacheMaxBytes=0",
47 "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
48 "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
49 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT })
51 public class CounterApplicationIT
53 public static final String TOPIC_IN = "in";
54 public static final String TOPIC_OUT = "out";
61 public static void testSendMessage(
62 @Autowired KafkaTemplate<String, TestInputWord> kafkaTemplate)
70 SendResult<String, TestInputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
72 "Sent: {}={}, partition={}, offset={}",
73 result.getProducerRecord().key(),
74 result.getProducerRecord().value(),
75 result.getRecordMetadata().partition(),
76 result.getRecordMetadata().offset());
80 throw new RuntimeException(e);
85 @DisplayName("Await the expected output messages")
87 void testSendMessage()
89 await("Expected messages")
90 .atMost(Duration.ofSeconds(10))
91 .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
97 private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
99 @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
100 public synchronized void receive(
101 @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
102 @Payload TestOutputWordCounter counter)
104 log.debug("Received message: {} -> {}", word, counter);
105 received.add(word, counter);
108 synchronized void enforceAssertion(
109 java.util.function.Consumer<MultiValueMap<TestOutputWord, TestOutputWordCounter>> assertion)
111 assertion.accept(received);
116 static class Configuration
121 return new Consumer();
126 KeyValueBytesStoreSupplier inMemoryStoreSupplier()
128 return Stores.inMemoryKeyValueStore("TEST-STORE");