From adca400cbb3b66f0c01999ecbb7bc700357bff42 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 22 Oct 2021 13:55:23 +0200 Subject: [PATCH] WIP --- pom.xml | 8 +- .../recorder/RecorderApplication.java | 36 ---- .../recorder/RecorderController.java | 80 ------- .../wordcount/recorder/RecordingResult.java | 19 -- .../recorder/SplitterApplication.java | 63 ++++++ ...ava => SplitterApplicationProperties.java} | 9 +- .../recorder/SplitterStreamProcessor.java | 200 ++++++++++++++++++ 7 files changed, 273 insertions(+), 142 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java rename src/main/java/de/juplo/kafka/wordcount/recorder/{RecorderApplicationProperties.java => SplitterApplicationProperties.java} (51%) create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java diff --git a/pom.xml b/pom.xml index edcedf5..16475de 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - recorder - 1.0.1 - Wordcount-Recorder - Recorder-service of the multi-user wordcount-example + kafka-splitter + 1.0.0 + Wordcount-Splitter + Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java deleted file mode 100644 index abe0685..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ /dev/null @@ -1,36 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.Properties; - - -@SpringBootApplication -@EnableConfigurationProperties(RecorderApplicationProperties.class) -public class RecorderApplication -{ - @Bean(destroyMethod = "close") - KafkaProducer producer(RecorderApplicationProperties properties) - { - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaProducer<>(props); - } - - public static void main(String[] args) - { - SpringApplication.run(RecorderApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java deleted file mode 100644 index 5fe69ad..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java +++ /dev/null @@ -1,80 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.util.MimeTypeUtils; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.context.request.async.DeferredResult; - -import javax.validation.constraints.NotEmpty; - - -@RestController -public class RecorderController -{ - private final String topic; - private final KafkaProducer producer; - - - public RecorderController(RecorderApplicationProperties properties, KafkaProducer producer) - { - this.topic = properties.getTopic(); - this.producer = producer; - } - - @PostMapping( - path = "/{username}", - consumes = { - MimeTypeUtils.TEXT_PLAIN_VALUE, - MimeTypeUtils.APPLICATION_JSON_VALUE - }, - produces = MimeTypeUtils.APPLICATION_JSON_VALUE) - DeferredResult> speak( - @PathVariable - @NotEmpty(message = "A username must be provided") - String username, - @RequestBody - @NotEmpty(message = "The spoken sentence must not be empty!") - String sentence) - { - DeferredResult> result = new DeferredResult<>(); - - ProducerRecord record = new ProducerRecord<>(topic, username, sentence); - producer.send(record, (metadata, exception) -> - { - if (metadata != null) - { - result.setResult( - ResponseEntity.ok(RecordingResult.of( - username, - sentence, - topic, - metadata.partition(), - metadata.offset(), - null, - null))); - } - else - { - result.setErrorResult( - ResponseEntity - .internalServerError() - .body(RecordingResult.of( - username, - sentence, - topic, - null, - null, - HttpStatus.INTERNAL_SERVER_ERROR.value(), - exception.toString()))); - } - }); - - return result; - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java deleted file mode 100644 index 939b1d4..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import com.fasterxml.jackson.annotation.JsonInclude; -import lombok.Value; - -import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; - - -@Value(staticConstructor = "of") -public class RecordingResult -{ - @JsonInclude(NON_NULL) private final String username; - @JsonInclude(NON_NULL) private final String sentence; - @JsonInclude(NON_NULL) private final String topic; - @JsonInclude(NON_NULL) private final Integer partition; - @JsonInclude(NON_NULL) private final Long offset; - @JsonInclude(NON_NULL) private final Integer status; - @JsonInclude(NON_NULL) private final String error; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java new file mode 100644 index 0000000..da17c3d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java @@ -0,0 +1,63 @@ +package de.juplo.kafka.wordcount.recorder; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.time.Clock; +import java.util.Properties; + + +@SpringBootApplication +@EnableConfigurationProperties(SplitterApplicationProperties.class) +public class SplitterApplication +{ + @Bean(destroyMethod = "close") + KafkaConsumer consumer(SplitterApplicationProperties properties) + { + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaConsumer<>(props); + } + + @Bean(destroyMethod = "close") + KafkaProducer producer(SplitterApplicationProperties properties) + { + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + + public static void main(String[] args) + { + SpringApplication.run(SplitterApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplicationProperties.java similarity index 51% rename from src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java rename to src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplicationProperties.java index 552ebaf..4983a9d 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplicationProperties.java @@ -7,12 +7,15 @@ import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties("juplo.wordcount.recorder") +@ConfigurationProperties("juplo.wordcount.splitter") @Getter @Setter @ToString -public class RecorderApplicationProperties +public class SplitterApplicationProperties { private String bootstrapServer = "localhost:9092"; - private String topic = "recordings"; + private String groupId = "splitter"; + private String inputTopic = "recordings"; + private String outputTopic = "words"; + private int commitInterval = 1000; } diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java new file mode 100644 index 0000000..fca26d5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java @@ -0,0 +1,200 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.time.Clock; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +@Component +@Slf4j +public class SplitterStreamProcessor implements ApplicationRunner +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + private final String inputTopic; + private final String outputTopic; + private final KafkaConsumer consumer; + private final KafkaProducer producer; + private final Clock clock; + private final int commitInterval; + + private boolean stopped = false; + private long lastCommit; + + public SplitterStreamProcessor( + SplitterApplicationProperties properties, + KafkaConsumer consumer, + KafkaProducer producer, + Clock clock) + { + this.inputTopic = properties.getInputTopic(); + this.outputTopic = properties.getOutputTopic(); + + this.consumer = consumer; + this.producer = producer; + + this.clock = clock; + this.commitInterval = properties.getCommitInterval(); + } + + @Override + public void run(ApplicationArguments args) + { + log.info("Initializing transaction"); + producer.initTransactions(); + + try + { + log.info("C - Subscribing to topic test"); + consumer.subscribe( + Arrays.asList(inputTopic), + new TransactionalConsumerRebalanceListener()); + + while (!stopped) + { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + records.forEach(inputRecord -> + { + String[] words = PATTERN.split(inputRecord.value()); + for (int i = 0; i < words.length; i++) + { + ProducerRecord outputRecord = + new ProducerRecord<>( + outputTopic, + inputRecord.key(), + words[i].trim()); + + producer.send(outputRecord, (metadata, exception) -> + { + if (exception == null) + { + // HANDLE SUCCESS + log.info( + "sent {}={}, partition={}, offset={}", + outputRecord.key(), + outputRecord.value(), + metadata.partition(), + metadata.offset()); + } + else + { + // HANDLE ERROR + log.error( + "could not send {}={}: {}", + outputRecord.key(), + outputRecord.value(), + exception.toString()); + } + }); + } + }); + + long delta = lastCommit - clock.millis(); + if (delta > commitInterval) + { + log.info("Elapsed time since last commit: {}ms", delta); + commitTransaction(); + beginTransaction(); + } + } + } + catch (WakeupException e) + { + log.info("Waking up from exception!", e); + commitTransaction(); + } + catch (Exception e) + { + log.error("Unexpected exception!", e); + producer.abortTransaction(); + } + finally + { + log.info("Closing consumer"); + consumer.close(); + log.info("C - DONE!"); + } + } + + private void beginTransaction() + { + log.info("Beginning new transaction"); + lastCommit = clock.millis(); + producer.beginTransaction(); + } + + private void commitTransaction() + { + log.info("Committing transaction"); + producer.commitTransaction(); + } + + class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener + { + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info( + "Assigned partitions: {}", + partitions + .stream() + .map(tp -> tp.topic() + "-" + tp.partition()) + .collect(Collectors.joining(", "))); + + commitTransaction(); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + log.info( + "Revoked partitions: {}", + partitions + .stream() + .map(tp -> tp.topic() + "-" + tp.partition()) + .collect(Collectors.joining(", "))); + + commitTransaction(); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info( + "Lost partitions: {}", + partitions + .stream() + .map(tp -> tp.topic() + "-" + tp.partition()) + .collect(Collectors.joining(", "))); + + producer.abortTransaction(); + } + } + + @PreDestroy + public void stop() + { + log.info("Stopping Consumer"); + stopped = true; + consumer.wakeup(); + } +} -- 2.20.1