<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
{
@Bean
public ApplicationRecordHandler recordHandler(
- KafkaProducer<String, String> kafkaProducer,
+ KafkaProducer<String, Object> kafkaProducer,
ApplicationProperties properties)
{
return new ApplicationRecordHandler(
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", JsonSerializer.class.getName());
+ props.put(JsonSerializer.TYPE_MAPPINGS,
+ "ADD:" + AddNumberMessage.class.getName() + "," +
+ "CALC:" + CalculateSumMessage.class.getName());
+
return new KafkaProducer<>(props);
}
@Slf4j
public class ApplicationRecordHandler implements RecordHandler<String, Integer>
{
- private final Producer<String, String> producer;
+ private final Producer<String, Object> producer;
private final String id;
private final String topic;
for (int i = 1; i <= number; i++)
{
- send(key, Integer.toString(i));
+ send(key, new AddNumberMessage(number, i));
}
- send(key, "CALCULATE");
+ send(key, new CalculateSumMessage(number));
}
- private void send(String key, String value)
+ private void send(String key, Object value)
{
final long time = System.currentTimeMillis();
- final ProducerRecord<String, String> record = new ProducerRecord<>(
+ final ProducerRecord<String, Object> record = new ProducerRecord<>(
topic, // Topic
key, // Key
value // Value