import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestRanking;
-import de.juplo.kafka.wordcount.users.TestUserData;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
Map<String, Object> properties = Map.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(),
- JsonSerializer.TYPE_MAPPINGS, "userdata:" + TestUserData.class.getName());
+ JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return new KafkaTemplate(producerFactory, properties);
}
userIn = testDriver.createInputTopic(
USERS_IN,
new StringSerializer(),
- jsonSerializer(TestUserData.class));
+ jsonSerializer(TestUserData.class).noTypeInfo());
}
jsonSerializer.configure(
Map.of(
JsonSerializer.TYPE_MAPPINGS,
- "userdata:" + TestUserData.class.getName() + "," +
"ranking:" + TestRanking.class.getName()),
false);
return jsonSerializer;