package de.juplo.kafka.wordcount.counter;
+import de.juplo.kafka.wordcount.splitter.TestInputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWord;
+import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
- "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.counter.Word",
- "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.counter.WordCounter",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
public static final String TOPIC_OUT = "out";
@Autowired
- KafkaTemplate<String, Word> kafkaTemplate;
+ KafkaTemplate<String, TestInputWord> kafkaTemplate;
@Autowired
Consumer consumer;
@Test
void testSendMessage()
{
- TestData.writeInputData((key, value) -> kafkaTemplate.send(TOPIC_IN, key, value));
+ TestData
+ .getInputMessages()
+ .forEach(word -> kafkaTemplate.send(TOPIC_IN, word.getUser(), word));
- await("Expexted converted data")
+ await("Expected messages")
.atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> TestData.assertExpectedResult(consumer.getReceivedMessages()));
+ .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
}
static class Consumer
{
- private final MultiValueMap<Word, WordCounter> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<TestOutputWord, TestOutputWordCounter> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) Word word,
- @Payload WordCounter counter)
+ @Header(KafkaHeaders.RECEIVED_KEY) TestOutputWord word,
+ @Payload TestOutputWordCounter counter)
{
log.debug("Received message: {} -> {}", word, counter);
received.add(word, counter);
}
- synchronized MultiValueMap<Word, WordCounter> getReceivedMessages()
+ synchronized MultiValueMap<TestOutputWord, TestOutputWordCounter> getReceivedMessages()
{
return received;
}