From 606c6a95d184de78677b41d058c2d73b4e692a7f Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Oct 2021 00:23:33 +0200 Subject: [PATCH] WIP --- .../splitter/SplitterApplication.java | 30 -------- .../splitter/SplitterStreamProcessor.java | 69 ++++++++++++++----- 2 files changed, 51 insertions(+), 48 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index 0459e3c..b9e3c9c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -20,36 +20,6 @@ import java.util.Properties; @EnableConfigurationProperties(SplitterApplicationProperties.class) public class SplitterApplication { - @Bean - KafkaConsumer consumer(SplitterApplicationProperties properties) - { - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); - - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - return new KafkaConsumer<>(props); - } - - @Bean - KafkaProducer producer(SplitterApplicationProperties properties) - { - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaProducer<>(props); - } - @Bean Clock clock() { diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 07dfb5e..51d22b3 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -1,22 +1,27 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; +import org.springframework.util.Assert; import javax.annotation.PreDestroy; import java.time.Clock; import java.time.Duration; import java.util.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -33,6 +38,7 @@ public class SplitterStreamProcessor implements ApplicationRunner private final KafkaProducer producer; private final Clock clock; private final int commitInterval; + private final Lock running = new ReentrantLock(); private boolean stopped = false; private long[] offsets; @@ -41,28 +47,45 @@ public class SplitterStreamProcessor implements ApplicationRunner public SplitterStreamProcessor( SplitterApplicationProperties properties, - KafkaConsumer consumer, - KafkaProducer producer, Clock clock) { this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); - this.consumer = consumer; - this.producer = producer; - this.clock = clock; this.commitInterval = properties.getCommitInterval(); + + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); + + Properties props; + + props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumer = new KafkaConsumer<>(props); + + props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producer = new KafkaProducer<>(props); } @Override public void run(ApplicationArguments args) { - log.info("Initializing transaction"); - producer.initTransactions(); + running.lock(); try { + log.info("Initializing transaction"); + producer.initTransactions(); + log.info("Subscribing to topic {}", inputTopic); consumer.subscribe( Arrays.asList(inputTopic), @@ -140,11 +163,18 @@ public class SplitterStreamProcessor implements ApplicationRunner } finally { - log.info("Closing consumer"); - consumer.close(); - log.info("Closing producer"); - producer.close(); - log.info("Exiting!"); + try + { + log.info("Closing consumer"); + consumer.close(); + log.info("Closing producer"); + producer.close(); + log.info("Exiting!"); + } + finally + { + running.unlock(); + } } } @@ -169,7 +199,8 @@ public class SplitterStreamProcessor implements ApplicationRunner } producer.sendOffsetsToTransaction( offsetsToSend, - consumer.groupMetadata());log.info("Committing transaction"); + consumer.groupMetadata()); + log.info("Committing transaction"); producer.commitTransaction(); } @@ -222,8 +253,10 @@ public class SplitterStreamProcessor implements ApplicationRunner @PreDestroy public void stop() { - log.info("Stopping Consumer"); + log.info("Shutdown requested..."); stopped = true; consumer.wakeup(); + running.lock(); + log.info("Shutdown completed!"); } } -- 2.20.1