- properties.getClientId(),
- properties.getTopic(),
- kafkaProducer);
- }
-
- @Bean(destroyMethod = "close")
- public KafkaProducer<String, Integer> kafkaProducer(ApplicationProperties properties)
- {
- Properties props = new Properties();
- props.put("bootstrap.servers", properties.getBootstrapServer());
- props.put("client.id", properties.getClientId());
- props.put("acks", properties.getAcks());
- props.put("batch.size", properties.getBatchSize());
- props.put("delivery.timeout.ms", 20000); // 20 Sekunden
- props.put("request.timeout.ms", 10000); // 10 Sekunden
- props.put("linger.ms", properties.getLingerMs());
- props.put("compression.type", properties.getCompressionType());
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", IntegerSerializer.class.getName());
-
- return new KafkaProducer<>(props);
+ kafkaProperties.getClientId(),
+ applicationProperties.getPartition(),
+ kafkaTemplate);