X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;h=791e164d6fe3ddfdf8600dec4430e082462b2733;hb=d5f54354b2b44d125493c830bf0475f7992ee395;hp=0000000000000000000000000000000000000000;hpb=c5700c2117f6c445278f272572c8b5732bf53bbf;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java new file mode 100644 index 0000000..791e164 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -0,0 +1,260 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; +import org.springframework.util.Assert; + +import javax.annotation.PreDestroy; +import java.time.Clock; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +@Component +@Slf4j +public class SplitterStreamProcessor implements ApplicationRunner +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + private final String inputTopic; + private final String outputTopic; + private final KafkaConsumer consumer; + private final KafkaProducer producer; + private final Clock clock; + private final int commitInterval; + private final Lock running = new ReentrantLock(); + + private boolean stopped = false; + private long[] offsets; + private Optional[] leaderEpochs; + private long lastCommit; + + public SplitterStreamProcessor( + SplitterApplicationProperties properties, + Clock clock) + { + this.inputTopic = properties.getInputTopic(); + this.outputTopic = properties.getOutputTopic(); + + this.clock = clock; + this.commitInterval = properties.getCommitInterval(); + + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); + + Properties props; + + props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumer = new KafkaConsumer<>(props); + + props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + producer = new KafkaProducer<>(props); + } + + @Override + public void run(ApplicationArguments args) + { + running.lock(); + + try + { + log.info("Initializing transaction"); + producer.initTransactions(); + + log.info("Subscribing to topic {}", inputTopic); + consumer.subscribe( + Arrays.asList(inputTopic), + new TransactionalConsumerRebalanceListener()); + + while (!stopped) + { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + records.forEach(inputRecord -> + { + log.debug( + "Received a recording of {}, partition={}, offset={}, epoch={}", + inputRecord.key(), + inputRecord.partition(), + inputRecord.offset(), + inputRecord.leaderEpoch()); + + offsets[inputRecord.partition()] = inputRecord.offset(); + leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); + + String[] words = PATTERN.split(inputRecord.value()); + for (int i = 0; i < words.length; i++) + { + ProducerRecord outputRecord = + new ProducerRecord<>( + outputTopic, + inputRecord.key(), + words[i].trim()); + + producer.send(outputRecord, (metadata, exception) -> + { + if (exception == null) + { + // HANDLE SUCCESS + log.debug( + "Sent {}={}, partition={}, offset={}", + outputRecord.key(), + outputRecord.value(), + metadata.partition(), + metadata.offset()); + } + else + { + // HANDLE ERROR + log.error( + "Could not send {}={}: {}", + outputRecord.key(), + outputRecord.value(), + exception.toString()); + } + }); + } + + long delta = clock.millis() - lastCommit; + if (delta > commitInterval) + { + log.info("Elapsed time since last commit: {}ms", delta); + commitTransaction(); + beginTransaction(); + } + }); + } + } + catch (WakeupException e) + { + log.info("Waking up from exception!"); + // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst! + // commitTransaction(); + } + catch (Exception e) + { + log.error("Unexpected exception!", e); + producer.abortTransaction(); + } + finally + { + try + { + log.info("Closing consumer"); + consumer.close(); + log.info("Closing producer"); + producer.close(); + log.info("Exiting!"); + } + finally + { + running.unlock(); + } + } + } + + private void beginTransaction() + { + log.info("Beginning new transaction"); + lastCommit = clock.millis(); + producer.beginTransaction(); + } + + private void commitTransaction() + { + Map offsetsToSend = new HashMap<>(); + for (int i = 0; i < offsets.length; i++) + { + if (offsets[i] > 0) + { + offsetsToSend.put( + new TopicPartition(inputTopic, i), + new OffsetAndMetadata(offsets[i], leaderEpochs[i], "")); + } + } + producer.sendOffsetsToTransaction( + offsetsToSend, + consumer.groupMetadata()); + log.info("Committing transaction"); + producer.commitTransaction(); + } + + class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener + { + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Assigned partitions: {}", toString(partitions)); + + // Compote the length of an array, that can be used to store the offsets + // (We can ignore the topic, since we only read from a single one!) + int length = + partitions + .stream() + .reduce( + 0, + (i, v) -> i < v.partition() ? v.partition() : i, + (l, r) -> l < r ? r : l) + 1; + offsets = new long[length]; + leaderEpochs = new Optional[length]; + + beginTransaction(); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + log.info("Revoked partitions: {}", toString(partitions)); + commitTransaction(); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info("Lost partitions: {}", toString(partitions)); + producer.abortTransaction(); + } + + String toString(Collection partitions) + { + return + partitions + .stream() + .map(tp -> tp.topic() + "-" + tp.partition()) + .collect(Collectors.joining(", ")); + } + } + + @PreDestroy + public void stop() + { + log.info("Shutdown requested..."); + stopped = true; + consumer.wakeup(); + running.lock(); + log.info("Shutdown completed!"); + } +}