top10: 1.2.1 - `Top10ApplicationIT` asserts type-mapping for output-data
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10ApplicationIT.java
index 88d03ba..ea8fe18 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.top10;
 import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -37,10 +38,7 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.auto-offset-reset=earliest",
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
-                               "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
-                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
-                               "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10   ",
+                               "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
@@ -128,18 +126,18 @@ public class Top10ApplicationIT
 
        static class Consumer
        {
-               private final MultiValueMap<User, Ranking> received = new LinkedMultiValueMap<>();
+               private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) User user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
                                @Payload TestRanking ranking)
                {
                        log.debug("Received message: {} -> {}", user, ranking);
-                       received.add(user, Ranking.of(ranking.getEntries()));
+                       received.add(user, ranking);
                }
 
-               synchronized MultiValueMap<User, Ranking> getReceivedMessages()
+               synchronized MultiValueMap<TestUser, TestRanking> getReceivedMessages()
                {
                        return received;
                }