import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableConfigurationProperties(SplitterApplicationProperties.class)
public class SplitterApplication
{
- @Bean(destroyMethod = "close")
+ @Bean
KafkaConsumer<String, String> consumer(SplitterApplicationProperties properties)
{
Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new KafkaConsumer<>(props);
}
- @Bean(destroyMethod = "close")
+ @Bean
KafkaProducer<String, String> producer(SplitterApplicationProperties properties)
{
Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set");
records.forEach(inputRecord ->
{
+ log.debug(
+ "Received a recording of {}, partition={}, offset={}, epoch={}",
+ inputRecord.key(),
+ inputRecord.partition(),
+ inputRecord.offset(),
+ inputRecord.leaderEpoch());
+
offsets[inputRecord.partition()] = inputRecord.offset();
leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
if (exception == null)
{
// HANDLE SUCCESS
- log.info(
- "sent {}={}, partition={}, offset={}",
+ log.debug(
+ "Sent {}={}, partition={}, offset={}",
outputRecord.key(),
outputRecord.value(),
metadata.partition(),
{
// HANDLE ERROR
log.error(
- "could not send {}={}: {}",
+ "Could not send {}={}: {}",
outputRecord.key(),
outputRecord.value(),
exception.toString());
}
});
}
- });
- long delta = lastCommit - clock.millis();
- if (delta > commitInterval)
- {
- log.info("Elapsed time since last commit: {}ms", delta);
- commitTransaction();
- beginTransaction();
- }
+ long delta = clock.millis() - lastCommit;
+ if (delta > commitInterval)
+ {
+ log.info("Elapsed time since last commit: {}ms", delta);
+ commitTransaction();
+ beginTransaction();
+ }
+ });
}
}
catch (WakeupException e)
{
- log.info("Waking up from exception!", e);
- commitTransaction();
+ log.info("Waking up from exception!");
+ // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst!
+ // commitTransaction();
}
catch (Exception e)
{