import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
@Bean
public RestProducer restProducer(
ApplicationProperties properties,
- KafkaProducer<String, String> kafkaProducer)
+ KafkaProducer<String, Object> kafkaProducer)
{
return
new RestProducer(
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", JsonSerializer.class.getName());
+ props.put(JsonSerializer.TYPE_MAPPINGS,
+ "ADD:" + AddNumberMessage.class.getName() + "," +
+ "CALC:" + CalculateSumMessage.class.getName());
return new KafkaProducer<>(props);
}