package de.juplo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
public class ApplicationConfiguration
{
@Bean
- public RestProducer restProducer(
+ public RestGateway restGateway(
ApplicationProperties properties,
- KafkaProducer<String, String> kafkaProducer)
+ KafkaProducer<String, Integer> kafkaProducer)
{
return
- new RestProducer(
+ new RestGateway(
properties.getClientId(),
properties.getTopic(),
properties.getPartition(),
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Integer> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", IntegerSerializer.class.getName());
return new KafkaProducer<>(props);
}