From 9c9ffbe3316ed295533c576e823794aa6de99665 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 Jun 2022 14:05:11 +0200 Subject: [PATCH] splitter: 1.0.0-spring-integration - Inital implementation (incomplete) * "Implementing" the splitter with Spring Integration mainly is a configuration task. * This version does not yet send the messages with the correct key. * Managment of transactions is not yet considered. * The test fails, because the key is missing. --- pom.xml | 13 +- .../wordcount/splitter/MessageSplitter.java | 2 + .../splitter/SplitterApplication.java | 31 +- .../SplitterApplicationProperties.java | 3 - .../splitter/SplitterStreamProcessor.java | 267 ------------------ src/main/resources/application.properties | 7 + .../wordcount/splitter/ApplicationTests.java | 3 - 7 files changed, 40 insertions(+), 286 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java diff --git a/pom.xml b/pom.xml index 1938fd9..d201639 100644 --- a/pom.xml +++ b/pom.xml @@ -10,9 +10,9 @@ de.juplo.kafka.wordcount splitter - 1.0.0-vanilla-kafka + 1.0.0-spring-integration Wordcount-Splitter - A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words, that is implemented in Vanilla-Kafka (without Kafka Streams) + A version of the stream-processor for the multi-user wordcount-example, that splits the sentences up into single words and is implemented based on Spring Integration 0.33.0 11 @@ -27,8 +27,8 @@ spring-boot-starter-web - org.apache.kafka - kafka-clients + org.springframework.integration + spring-integration-kafka org.hibernate.validator @@ -56,11 +56,6 @@ spring-boot-starter-test test - - org.springframework.kafka - spring-kafka - test - org.springframework.kafka spring-kafka-test diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java b/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java index 0665f3e..6128f0a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/MessageSplitter.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.splitter; +import org.springframework.integration.annotation.Splitter; import org.springframework.stereotype.Component; import java.util.regex.Pattern; @@ -10,6 +11,7 @@ public class MessageSplitter { final static Pattern PATTERN = Pattern.compile("\\W+"); + @Splitter(inputChannel = "recordings", outputChannel = "words") String[] split(String message) { return PATTERN.split(message); diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java index d46a7cd..412f429 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplication.java @@ -4,20 +4,43 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; - -import java.time.Clock; +import org.springframework.expression.common.LiteralExpression; +import org.springframework.integration.annotation.InboundChannelAdapter; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.kafka.inbound.KafkaMessageSource; +import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.listener.ConsumerProperties; +import org.springframework.messaging.MessageHandler; @SpringBootApplication @EnableConfigurationProperties(SplitterApplicationProperties.class) +@EnableIntegration public class SplitterApplication { + @InboundChannelAdapter(channel = "recordings") @Bean - Clock clock() + KafkaMessageSource source( + ConsumerFactory cf, + SplitterApplicationProperties properties) { - return Clock.systemDefaultZone(); + return new KafkaMessageSource<>(cf, new ConsumerProperties(properties.getInputTopic())); } + @Bean + @ServiceActivator(inputChannel = "words") + MessageHandler handler( + KafkaTemplate kafkaTemplate, + SplitterApplicationProperties properties) + { + KafkaProducerMessageHandler handler = + new KafkaProducerMessageHandler<>(kafkaTemplate); + handler.setTopicExpression(new LiteralExpression(properties.getOutputTopic())); + return handler; + } public static void main(String[] args) { diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java index f699f32..4881ea5 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterApplicationProperties.java @@ -13,9 +13,6 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ToString public class SplitterApplicationProperties { - private String bootstrapServer = "localhost:9092"; - private String groupId = "splitter"; private String inputTopic = "recordings"; private String outputTopic = "words"; - private int commitInterval = 1000; } diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java deleted file mode 100644 index 0eafbda..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ /dev/null @@ -1,267 +0,0 @@ -package de.juplo.kafka.wordcount.splitter; - -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.core.task.TaskExecutor; -import org.springframework.stereotype.Component; -import org.springframework.util.Assert; - -import javax.annotation.PreDestroy; -import java.time.Clock; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; - - -@Component -@Slf4j -public class SplitterStreamProcessor implements Runnable -{ - private final MessageSplitter splitter; - private final String inputTopic; - private final String outputTopic; - private final KafkaConsumer consumer; - private final KafkaProducer producer; - private final Clock clock; - private final int commitInterval; - private final Lock running = new ReentrantLock(); - - private boolean stopped = false; - private long[] offsets; - private Optional[] leaderEpochs; - private long lastCommit; - - public SplitterStreamProcessor( - MessageSplitter splitter, - SplitterApplicationProperties properties, - Clock clock, - TaskExecutor executor) - { - this.splitter = splitter; - - this.inputTopic = properties.getInputTopic(); - this.outputTopic = properties.getOutputTopic(); - - this.clock = clock; - this.commitInterval = properties.getCommitInterval(); - - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.splitter.bootstrap-server must be set"); - - Properties props; - - props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.getGroupId()); - props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - consumer = new KafkaConsumer<>(props); - - props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-ID"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - producer = new KafkaProducer<>(props); - - executor.execute(this); - } - - public void run() - { - running.lock(); - - try - { - log.info("Initializing transaction"); - producer.initTransactions(); - - log.info("Subscribing to topic {}", inputTopic); - consumer.subscribe( - Arrays.asList(inputTopic), - new TransactionalConsumerRebalanceListener()); - - while (!stopped) - { - ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); - - records.forEach(inputRecord -> - { - log.debug( - "Received a recording of {}, partition={}, offset={}, epoch={}", - inputRecord.key(), - inputRecord.partition(), - inputRecord.offset(), - inputRecord.leaderEpoch()); - - offsets[inputRecord.partition()] = inputRecord.offset(); - leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); - - String[] words = splitter.split(inputRecord.value()); - for (int i = 0; i < words.length; i++) - { - ProducerRecord outputRecord = - new ProducerRecord<>( - outputTopic, - inputRecord.key(), - words[i].trim()); - - producer.send(outputRecord, (metadata, exception) -> - { - if (exception == null) - { - // HANDLE SUCCESS - log.debug( - "Sent {}={}, partition={}, offset={}", - outputRecord.key(), - outputRecord.value(), - metadata.partition(), - metadata.offset()); - } - else - { - // HANDLE ERROR - log.error( - "Could not send {}={}: {}", - outputRecord.key(), - outputRecord.value(), - exception.toString()); - } - }); - } - - long delta = clock.millis() - lastCommit; - if (delta > commitInterval) - { - log.info("Elapsed time since last commit: {}ms", delta); - commitTransaction(); - beginTransaction(); - } - }); - } - } - catch (WakeupException e) - { - log.info("Waking up from exception!"); - // Nicht nötig, da consumer.close() onPartitionsRevoked() auslöst! - // commitTransaction(); - } - catch (Exception e) - { - log.error("Unexpected exception!", e); - producer.abortTransaction(); - } - finally - { - try - { - log.info("Closing consumer"); - consumer.close(); - log.info("Closing producer"); - producer.close(); - log.info("Exiting!"); - } - finally - { - running.unlock(); - } - } - } - - private void beginTransaction() - { - log.info("Beginning new transaction"); - lastCommit = clock.millis(); - producer.beginTransaction(); - } - - private void commitTransaction() - { - Map offsetsToSend = new HashMap<>(); - for (int i = 0; i < offsets.length; i++) - { - if (offsets[i] > 0) - { - offsetsToSend.put( - new TopicPartition(inputTopic, i), - new OffsetAndMetadata(offsets[i], leaderEpochs[i], "")); - } - } - producer.sendOffsetsToTransaction( - offsetsToSend, - consumer.groupMetadata()); - log.info("Committing transaction"); - producer.commitTransaction(); - } - - class TransactionalConsumerRebalanceListener implements ConsumerRebalanceListener - { - @Override - public void onPartitionsAssigned(Collection partitions) - { - log.info("Assigned partitions: {}", toString(partitions)); - - // Compote the length of an array, that can be used to store the offsets - // (We can ignore the topic, since we only read from a single one!) - int length = - partitions - .stream() - .reduce( - 0, - (i, v) -> i < v.partition() ? v.partition() : i, - (l, r) -> l < r ? r : l) + 1; - offsets = new long[length]; - leaderEpochs = new Optional[length]; - - beginTransaction(); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - log.info("Revoked partitions: {}", toString(partitions)); - commitTransaction(); - } - - @Override - public void onPartitionsLost(Collection partitions) - { - log.info("Lost partitions: {}", toString(partitions)); - producer.abortTransaction(); - } - - String toString(Collection partitions) - { - return - partitions - .stream() - .map(tp -> tp.topic() + "-" + tp.partition()) - .collect(Collectors.joining(", ")); - } - } - - @PreDestroy - public void stop() - { - log.info("Shutdown requested..."); - if (stopped) - { - log.warn("Ignoring request: already stopped!"); - return; - } - stopped = true; - consumer.wakeup(); - running.lock(); - log.info("Shutdown completed!"); - } -} diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 0d2624f..94e7492 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,3 +1,10 @@ server.port=8086 management.endpoints.web.exposure.include=* logging.level.de.juplo.kafka.wordcount.splitter=DEBUG +spring.kafka.consumer.auto-offset-reset. earliest +spring.kafka.consumer.group-id=splitter +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java index 775d3bd..113501f 100644 --- a/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/ApplicationTests.java @@ -39,8 +39,6 @@ class ApplicationTests final static String TOPIC_IN = "in"; final static String TOPIC_OUT = "out"; - @Autowired - SplitterStreamProcessor splitter; @Autowired KafkaTemplate kafkaTemplate; @Autowired @@ -57,7 +55,6 @@ class ApplicationTests @Test void contextLoads() { - splitter.stop(); } @Test -- 2.20.1