top10: 1.2.1 - Introduced `TestUser`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / top10 / Top10ApplicationIT.java
index 88d03ba..1bec92d 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.top10;
 import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -38,9 +39,8 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
-                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.query.TestUser",
                                "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10   ",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
@@ -132,11 +132,11 @@ public class Top10ApplicationIT
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) User user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
                                @Payload TestRanking ranking)
                {
                        log.debug("Received message: {} -> {}", user, ranking);
-                       received.add(user, Ranking.of(ranking.getEntries()));
+                       received.add(User.of(user.getUser()), Ranking.of(ranking.getEntries()));
                }
 
                synchronized MultiValueMap<User, Ranking> getReceivedMessages()