X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;h=0000000000000000000000000000000000000000;hb=9c9ffbe3316ed295533c576e823794aa6de99665;hp=0eafbda49e0a44c577362d4caf8523bf925eee6b;hpb=0e63376a3cfc8ecabdc4699e9307f6a51415cb09;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java deleted file mode 100644 index 0eafbda..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ /dev/null @@ -1,267 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.core.task.TaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.Assert; - -import javax.annotation.PreDestroy; -import java.time.Clock; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - - -@Component -@Slf4j -public class SplitterStreamProcessor implements Runnable -{ - private final MessageSplitter splitter; - private final String inputTopic; - private final String outputTopic; - private final KafkaConsumer consumer; - private final KafkaProducer producer; - private final Clock clock; - private final int commitInterval; - private final Lock running = new ReentrantLock(); - - private boolean stopped = false; - private long[] offsets; - private Optional[] leaderEpochs; - private long lastCommit; - - public SplitterStreamProcessor( - MessageSplitter splitter, - SplitterApplicationProperties properties, - Clock clock, - TaskExecutor executor) - { - this.splitter = splitter; - - this.inputTopic = properties.getInputTopic(); - this.outputTopic = properties.getOutputTopic(); - - this.clock = clock; - this.commitInterval = properties.getCommitInterval(); - - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); - - Properties props; - - props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumer = new KafkaConsumer<>(props); - - props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producer = new KafkaProducer<>(props); - - executor.execute(this); - } - - public void run() - { - running.lock(); - - try - { - log.info("Initializing transaction"); - producer.initTransactions(); - - log.info("Subscribing to topic {}", inputTopic); - consumer.subscribe( - Arrays.asList(inputTopic), - new TransactionalConsumerRebalanceListener()); - - while (!stopped) - { - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); - - records.forEach(inputRecord -> - { - log.debug( - "Received a recording of {}, partition={}, offset={}, epoch={}", - inputRecord.key(), - inputRecord.partition(), - inputRecord.offset(), - inputRecord.leaderEpoch()); - - offsets[inputRecord.partition()] = inputRecord.offset(); - leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); - - String[] words = splitter.split(inputRecord.value()); - for (int i = 0; i < words.length; i++) - { - ProducerRecord outputRecord = - new ProducerRecord<>( - outputTopic, - inputRecord.key(), - words[i].trim()); - - producer.send(outputRecord, (metadata, exception) -> - { - if (exception == null) - { - // HANDLE SUCCESS - log.debug( - "Sent {}={}, partition={}, offset={}", - outputRecord.key(), - outputRecord.value(), - metadata.partition(), - metadata.offset()); - } - else - { - // HANDLE ERROR - log.error( - "Could not send {}={}: {}", - outputRecord.key(), - outputRecord.value(), - exception.toString()); - } - }); - } - - long delta = clock.millis() - lastCommit; - if (delta > commitInterval) - { - log.info("Elapsed time since last commit: {}ms", delta); - commitTransaction(); - beginTransaction(); - } - }); - } - } - catch (WakeupException e) - { - log.info("Waking up from exception!"); - // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst! - // commitTransaction(); - } - catch (Exception e) - { - log.error("Unexpected exception!", e); - producer.abortTransaction(); - } - finally - { - try - { - log.info("Closing consumer"); - consumer.close(); - log.info("Closing producer"); - producer.close(); - log.info("Exiting!"); - } - finally - { - running.unlock(); - } - } - } - - private void beginTransaction() - { - log.info("Beginning new transaction"); - lastCommit = clock.millis(); - producer.beginTransaction(); - } - - private void commitTransaction() - { - Map offsetsToSend = new HashMap<>(); - for (int i = 0; i < offsets.length; i++) - { - if (offsets[i] > 0) - { - offsetsToSend.put( - new TopicPartition(inputTopic, i), - new OffsetAndMetadata(offsets[i], leaderEpochs[i], "")); - } - } - producer.sendOffsetsToTransaction( - offsetsToSend, - consumer.groupMetadata()); - log.info("Committing transaction"); - producer.commitTransaction(); - } - - class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener - { - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Assigned partitions: {}", toString(partitions)); - - // Compote the length of an array, that can be used to store the offsets - // (We can ignore the topic, since we only read from a single one!) - int length = - partitions - .stream() - .reduce( - 0, - (i, v) -> i < v.partition() ? v.partition() : i, - (l, r) -> l < r ? r : l) + 1; - offsets = new long[length]; - leaderEpochs = new Optional[length]; - - beginTransaction(); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - log.info("Revoked partitions: {}", toString(partitions)); - commitTransaction(); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.info("Lost partitions: {}", toString(partitions)); - producer.abortTransaction(); - } - - String toString(Collection partitions) - { - return - partitions - .stream() - .map(tp -> tp.topic() + "-" + tp.partition()) - .collect(Collectors.joining(", ")); - } - } - - @PreDestroy - public void stop() - { - log.info("Shutdown requested..."); - if (stopped) - { - log.warn("Ignoring request: already stopped!"); - return; - } - stopped = true; - consumer.wakeup(); - running.lock(); - log.info("Shutdown completed!"); - } -}