From cf11c54b9750d68cdb869eeb4bc705cc0364d4a9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Oct 2021 23:57:56 +0200 Subject: [PATCH] WIP --- .../splitter/SplitterApplication.java | 9 ++--- .../splitter/SplitterStreamProcessor.java | 34 ++++++++++++------- src/main/resources/application.properties | 1 + 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index b7a94dd..0459e3c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -19,7 +20,7 @@ import java.util.Properties; @EnableConfigurationProperties(SplitterApplicationProperties.class) public class SplitterApplication { - @Bean(destroyMethod = "close") + @Bean KafkaConsumer consumer(SplitterApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); @@ -28,13 +29,13 @@ public class SplitterApplication props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new KafkaConsumer<>(props); } - @Bean(destroyMethod = "close") + @Bean KafkaProducer producer(SplitterApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index dab15f8..07dfb5e 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -74,6 +74,13 @@ public class SplitterStreamProcessor implements ApplicationRunner records.forEach(inputRecord -> { + log.debug( + "Received a recording of {}, partition={}, offset={}, epoch={}", + inputRecord.key(), + inputRecord.partition(), + inputRecord.offset(), + inputRecord.leaderEpoch()); + offsets[inputRecord.partition()] = inputRecord.offset(); leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); @@ -91,8 +98,8 @@ public class SplitterStreamProcessor implements ApplicationRunner if (exception == null) { // HANDLE SUCCESS - log.info( - "sent {}={}, partition={}, offset={}", + log.debug( + "Sent {}={}, partition={}, offset={}", outputRecord.key(), outputRecord.value(), metadata.partition(), @@ -102,28 +109,29 @@ public class SplitterStreamProcessor implements ApplicationRunner { // HANDLE ERROR log.error( - "could not send {}={}: {}", + "Could not send {}={}: {}", outputRecord.key(), outputRecord.value(), exception.toString()); } }); } - }); - long delta = lastCommit - clock.millis(); - if (delta > commitInterval) - { - log.info("Elapsed time since last commit: {}ms", delta); - commitTransaction(); - beginTransaction(); - } + long delta = clock.millis() - lastCommit; + if (delta > commitInterval) + { + log.info("Elapsed time since last commit: {}ms", delta); + commitTransaction(); + beginTransaction(); + } + }); } } catch (WakeupException e) { - log.info("Waking up from exception!", e); - commitTransaction(); + log.info("Waking up from exception!"); + // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst! + // commitTransaction(); } catch (Exception e) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 3046fc3..0d2624f 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,2 +1,3 @@ server.port=8086 management.endpoints.web.exposure.include=* +logging.level.de.juplo.kafka.wordcount.splitter=DEBUG -- 2.20.1