import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-
+import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ApplicationConfiguration
{
@Bean
- public SumUpRecordHandler sumUpRecordHandler(
- KafkaProducer<String, String> kafkaProducer,
+ public ApplicationRecordHandler recordHandler(
+ KafkaProducer<String, Object> kafkaProducer,
ApplicationProperties properties)
{
- return new SumUpRecordHandler(
+ return new ApplicationRecordHandler(
kafkaProducer,
properties.getClientId(),
properties.getTopicOut());
public EndlessConsumer<String, Integer> endlessConsumer(
KafkaConsumer<String, Integer> kafkaConsumer,
ExecutorService executor,
- SumUpRecordHandler sumUpRecordHandler,
+ ApplicationRecordHandler recordHandler,
ApplicationProperties properties)
{
return
properties.getClientId(),
properties.getTopicIn(),
kafkaConsumer,
- sumUpRecordHandler);
+ recordHandler);
}
@Bean
}
@Bean(destroyMethod = "close")
- public KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
+ public KafkaProducer<String, Object> kafkaProducer(ApplicationProperties properties)
{
Properties props = new Properties();
props.put("bootstrap.servers", properties.getBootstrapServer());
props.put("linger.ms", properties.getLingerMs());
props.put("compression.type", properties.getCompressionType());
props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", "TODO: JsonSerializer konfigurieren");
return new KafkaProducer<>(props);
}