top10: 1.2.1 - Introduced `TestUser`
authorKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 13:04:31 +0000 (15:04 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 8 Jun 2024 13:04:31 +0000 (15:04 +0200)
src/test/java/de/juplo/kafka/wordcount/query/TestUser.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java

diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/query/TestUser.java
new file mode 100644 (file)
index 0000000..53a5992
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.query;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUser
+{
+  String user;
+}
index 88d03ba..1bec92d 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.top10;
 import de.juplo.kafka.wordcount.counter.TestWord;
 import de.juplo.kafka.wordcount.counter.TestCounter;
 import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -38,9 +39,8 @@ import static org.awaitility.Awaitility.await;
                                "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
                                "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
-                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
+                               "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.query.TestUser",
                                "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking",
-                               "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10   ",
                                "logging.level.root=WARN",
                                "logging.level.de.juplo=DEBUG",
                                "logging.level.org.apache.kafka.clients=INFO",
@@ -132,11 +132,11 @@ public class Top10ApplicationIT
 
                @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
                public synchronized void receive(
-                               @Header(KafkaHeaders.RECEIVED_KEY) User user,
+                               @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
                                @Payload TestRanking ranking)
                {
                        log.debug("Received message: {} -> {}", user, ranking);
-                       received.add(user, Ranking.of(ranking.getEntries()));
+                       received.add(User.of(user.getUser()), Ranking.of(ranking.getEntries()));
                }
 
                synchronized MultiValueMap<User, Ranking> getReceivedMessages()