import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.top10.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
KafkaTemplate top10KafkaTemplate(ProducerFactory producerFactory)
{
Map<String, Object> properties = Map.of(
- ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "ranking:" + TestRanking.class.getName());
+ JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName());
return new KafkaTemplate(producerFactory, properties);
}