import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.properties.spring.json.use.type.headers=false",
- "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
+ "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.query.TestUser",
"spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking",
- "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) User user,
+ @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
@Payload TestRanking ranking)
{
log.debug("Received message: {} -> {}", user, ranking);
- received.add(user, Ranking.of(ranking.getEntries()));
+ received.add(User.of(user.getUser()), Ranking.of(ranking.getEntries()));
}
synchronized MultiValueMap<User, Ranking> getReceivedMessages()