From: Kai Moritz Date: Fri, 22 Oct 2021 21:02:26 +0000 (+0200) Subject: WIP X-Git-Tag: notnagel~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=570184a5d53d80dae39c3f701e747c65e7967165;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/Dockerfile b/Dockerfile index 9c0b843..803477f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM openjdk:11-jre-slim COPY target/*.jar /opt/app.jar -EXPOSE 8081 +EXPOSE 8086 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] CMD [] diff --git a/pom.xml b/pom.xml index 16475de..fee0245 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - kafka-splitter - 1.0.0 + splitter + 1.0.0-vanilla-kafka Wordcount-Splitter - Stream-processor of the multi-user wordcount-example, that splits the sentences up into single words + A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words, that is implemented in Vanilla-Kafka (without Kafka Streams) 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java deleted file mode 100644 index da17c3d..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplication.java +++ /dev/null @@ -1,63 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.time.Clock; -import java.util.Properties; - - -@SpringBootApplication -@EnableConfigurationProperties(SplitterApplicationProperties.class) -public class SplitterApplication -{ - @Bean(destroyMethod = "close") - KafkaConsumer consumer(SplitterApplicationProperties properties) - { - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); - - Properties props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaConsumer<>(props); - } - - @Bean(destroyMethod = "close") - KafkaProducer producer(SplitterApplicationProperties properties) - { - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaProducer<>(props); - } - - @Bean - Clock clock() - { - return Clock.systemDefaultZone(); - } - - - public static void main(String[] args) - { - SpringApplication.run(SplitterApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplicationProperties.java deleted file mode 100644 index 4983a9d..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterApplicationProperties.java +++ /dev/null @@ -1,21 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.splitter") -@Getter -@Setter -@ToString -public class SplitterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String groupId = "splitter"; - private String inputTopic = "recordings"; - private String outputTopic = "words"; - private int commitInterval = 1000; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java deleted file mode 100644 index 92a544b..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/SplitterStreamProcessor.java +++ /dev/null @@ -1,223 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.stereotype.Component; - -import javax.annotation.PreDestroy; -import java.time.Clock; -import java.time.Duration; -import java.util.*; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - - -@Component -@Slf4j -public class SplitterStreamProcessor implements ApplicationRunner -{ - final static Pattern PATTERN = Pattern.compile("\\W+"); - - private final String inputTopic; - private final String outputTopic; - private final KafkaConsumer consumer; - private final KafkaProducer producer; - private final Clock clock; - private final int commitInterval; - - private boolean stopped = false; - private long[] offsets; - private Optional[] leaderEpochs; - private long lastCommit; - - public SplitterStreamProcessor( - SplitterApplicationProperties properties, - KafkaConsumer consumer, - KafkaProducer producer, - Clock clock) - { - this.inputTopic = properties.getInputTopic(); - this.outputTopic = properties.getOutputTopic(); - - this.consumer = consumer; - this.producer = producer; - - this.clock = clock; - this.commitInterval = properties.getCommitInterval(); - } - - @Override - public void run(ApplicationArguments args) - { - log.info("Initializing transaction"); - producer.initTransactions(); - - try - { - log.info("Subscribing to topic {}", inputTopic); - consumer.subscribe( - Arrays.asList(inputTopic), - new TransactionalConsumerRebalanceListener()); - - while (!stopped) - { - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); - - records.forEach(inputRecord -> - { - offsets[inputRecord.partition()] = inputRecord.offset(); - leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); - - String[] words = PATTERN.split(inputRecord.value()); - for (int i = 0; i < words.length; i++) - { - ProducerRecord outputRecord = - new ProducerRecord<>( - outputTopic, - inputRecord.key(), - words[i].trim()); - - producer.send(outputRecord, (metadata, exception) -> - { - if (exception == null) - { - // HANDLE SUCCESS - log.info( - "sent {}={}, partition={}, offset={}", - outputRecord.key(), - outputRecord.value(), - metadata.partition(), - metadata.offset()); - } - else - { - // HANDLE ERROR - log.error( - "could not send {}={}: {}", - outputRecord.key(), - outputRecord.value(), - exception.toString()); - } - }); - } - }); - - long delta = lastCommit - clock.millis(); - if (delta > commitInterval) - { - log.info("Elapsed time since last commit: {}ms", delta); - commitTransaction(); - beginTransaction(); - } - } - } - catch (WakeupException e) - { - log.info("Waking up from exception!", e); - commitTransaction(); - } - catch (Exception e) - { - log.error("Unexpected exception!", e); - producer.abortTransaction(); - } - finally - { - log.info("Closing consumer"); - consumer.close(); - log.info("Closing producer"); - producer.close(); - log.info("Exiting!"); - } - } - - private void beginTransaction() - { - log.info("Beginning new transaction"); - lastCommit = clock.millis(); - producer.beginTransaction(); - } - - private void commitTransaction() - { - Map offsetsToSend = new HashMap<>(); - for (int i = 0; i < offsets.length; i++) - { - if (offsets[i] > 0) - { - offsetsToSend.put( - new TopicPartition(inputTopic, i), - new OffsetAndMetadata(offsets[i], leaderEpochs[i], "")); - } - } - producer.sendOffsetsToTransaction( - offsetsToSend, - consumer.groupMetadata());log.info("Committing transaction"); - producer.commitTransaction(); - } - - class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener - { - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Assigned partitions: {}", toString(partitions)); - - // Compote the length of an array, that can be used to store the offsets - // (We can ignore the topic, since we only read from a single one!) - int length = - partitions - .stream() - .reduce( - 0, - (i, v) -> i < v.partition() ? v.partition() : i, - (l, r) -> l < r ? r : l) + 1; - offsets = new long[length]; - leaderEpochs = new Optional[length]; - - beginTransaction(); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - log.info("Revoked partitions: {}", toString(partitions)); - commitTransaction(); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.info("Lost partitions: {}", toString(partitions)); - producer.abortTransaction(); - } - - String toString(Collection partitions) - { - return - partitions - .stream() - .map(tp -> tp.topic() + "-" + tp.partition()) - .collect(Collectors.joining(", ")); - } - } - - @PreDestroy - public void stop() - { - log.info("Stopping Consumer"); - stopped = true; - consumer.wakeup(); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java new file mode 100644 index 0000000..b7a94dd --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -0,0 +1,63 @@ +package de.juplo.kafka.wordcount.splitter; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.time.Clock; +import java.util.Properties; + + +@SpringBootApplication +@EnableConfigurationProperties(SplitterApplicationProperties.class) +public class SplitterApplication +{ + @Bean(destroyMethod = "close") + KafkaConsumer consumer(SplitterApplicationProperties properties) + { + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); + + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaConsumer<>(props); + } + + @Bean(destroyMethod = "close") + KafkaProducer producer(SplitterApplicationProperties properties) + { + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); + } + + @Bean + Clock clock() + { + return Clock.systemDefaultZone(); + } + + + public static void main(String[] args) + { + SpringApplication.run(SplitterApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java new file mode 100644 index 0000000..f699f32 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java @@ -0,0 +1,21 @@ +package de.juplo.kafka.wordcount.splitter; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.splitter") +@Getter +@Setter +@ToString +public class SplitterApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String groupId = "splitter"; + private String inputTopic = "recordings"; + private String outputTopic = "words"; + private int commitInterval = 1000; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java new file mode 100644 index 0000000..dab15f8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -0,0 +1,221 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; +import java.time.Clock; +import java.time.Duration; +import java.util.*; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + + +@Component +@Slf4j +public class SplitterStreamProcessor implements ApplicationRunner +{ + final static Pattern PATTERN = Pattern.compile("\\W+"); + + private final String inputTopic; + private final String outputTopic; + private final KafkaConsumer consumer; + private final KafkaProducer producer; + private final Clock clock; + private final int commitInterval; + + private boolean stopped = false; + private long[] offsets; + private Optional[] leaderEpochs; + private long lastCommit; + + public SplitterStreamProcessor( + SplitterApplicationProperties properties, + KafkaConsumer consumer, + KafkaProducer producer, + Clock clock) + { + this.inputTopic = properties.getInputTopic(); + this.outputTopic = properties.getOutputTopic(); + + this.consumer = consumer; + this.producer = producer; + + this.clock = clock; + this.commitInterval = properties.getCommitInterval(); + } + + @Override + public void run(ApplicationArguments args) + { + log.info("Initializing transaction"); + producer.initTransactions(); + + try + { + log.info("Subscribing to topic {}", inputTopic); + consumer.subscribe( + Arrays.asList(inputTopic), + new TransactionalConsumerRebalanceListener()); + + while (!stopped) + { + ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); + + records.forEach(inputRecord -> + { + offsets[inputRecord.partition()] = inputRecord.offset(); + leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); + + String[] words = PATTERN.split(inputRecord.value()); + for (int i = 0; i < words.length; i++) + { + ProducerRecord outputRecord = + new ProducerRecord<>( + outputTopic, + inputRecord.key(), + words[i].trim()); + + producer.send(outputRecord, (metadata, exception) -> + { + if (exception == null) + { + // HANDLE SUCCESS + log.info( + "sent {}={}, partition={}, offset={}", + outputRecord.key(), + outputRecord.value(), + metadata.partition(), + metadata.offset()); + } + else + { + // HANDLE ERROR + log.error( + "could not send {}={}: {}", + outputRecord.key(), + outputRecord.value(), + exception.toString()); + } + }); + } + }); + + long delta = lastCommit - clock.millis(); + if (delta > commitInterval) + { + log.info("Elapsed time since last commit: {}ms", delta); + commitTransaction(); + beginTransaction(); + } + } + } + catch (WakeupException e) + { + log.info("Waking up from exception!", e); + commitTransaction(); + } + catch (Exception e) + { + log.error("Unexpected exception!", e); + producer.abortTransaction(); + } + finally + { + log.info("Closing consumer"); + consumer.close(); + log.info("Closing producer"); + producer.close(); + log.info("Exiting!"); + } + } + + private void beginTransaction() + { + log.info("Beginning new transaction"); + lastCommit = clock.millis(); + producer.beginTransaction(); + } + + private void commitTransaction() + { + Map offsetsToSend = new HashMap<>(); + for (int i = 0; i < offsets.length; i++) + { + if (offsets[i] > 0) + { + offsetsToSend.put( + new TopicPartition(inputTopic, i), + new OffsetAndMetadata(offsets[i], leaderEpochs[i], "")); + } + } + producer.sendOffsetsToTransaction( + offsetsToSend, + consumer.groupMetadata());log.info("Committing transaction"); + producer.commitTransaction(); + } + + class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener + { + @Override + public void onPartitionsAssigned(Collection partitions) + { + log.info("Assigned partitions: {}", toString(partitions)); + + // Compote the length of an array, that can be used to store the offsets + // (We can ignore the topic, since we only read from a single one!) + int length = + partitions + .stream() + .reduce( + 0, + (i, v) -> i < v.partition() ? v.partition() : i, + (l, r) -> l < r ? r : l) + 1; + offsets = new long[length]; + leaderEpochs = new Optional[length]; + + beginTransaction(); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + log.info("Revoked partitions: {}", toString(partitions)); + commitTransaction(); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info("Lost partitions: {}", toString(partitions)); + producer.abortTransaction(); + } + + String toString(Collection partitions) + { + return + partitions + .stream() + .map(tp -> tp.topic() + "-" + tp.partition()) + .collect(Collectors.joining(", ")); + } + } + + @PreDestroy + public void stop() + { + log.info("Stopping Consumer"); + stopped = true; + consumer.wakeup(); + } +} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 28d43c9..3046fc3 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,2 +1,2 @@ -server.port=8081 +server.port=8086 management.endpoints.web.exposure.include=* diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java deleted file mode 100644 index 885a408..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class ApplicationTests -{ - @Test - void contextLoads() - { - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java new file mode 100644 index 0000000..77266a9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.splitter; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ApplicationTests +{ + @Test + void contextLoads() + { + } +}