X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;h=0eafbda49e0a44c577362d4caf8523bf925eee6b;hb=0e63376a3cfc8ecabdc4699e9307f6a51415cb09;hp=791e164d6fe3ddfdf8600dec4430e082462b2733;hpb=d5f54354b2b44d125493c830bf0475f7992ee395;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 791e164..0eafbda 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -9,8 +9,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.Assert; @@ -20,16 +19,14 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.regex.Pattern; import java.util.stream.Collectors; @Component @Slf4j -public class SplitterStreamProcessor implements ApplicationRunner +public class SplitterStreamProcessor implements Runnable { - final static Pattern PATTERN = Pattern.compile("\\W+"); - + private final MessageSplitter splitter; private final String inputTopic; private final String outputTopic; private final KafkaConsumer consumer; @@ -44,9 +41,13 @@ public class SplitterStreamProcessor implements ApplicationRunner private long lastCommit; public SplitterStreamProcessor( + MessageSplitter splitter, SplitterApplicationProperties properties, - Clock clock) + Clock clock, + TaskExecutor executor) { + this.splitter = splitter; + this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); @@ -72,10 +73,11 @@ public class SplitterStreamProcessor implements ApplicationRunner props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producer = new KafkaProducer<>(props); + + executor.execute(this); } - @Override - public void run(ApplicationArguments args) + public void run() { running.lock(); @@ -105,7 +107,7 @@ public class SplitterStreamProcessor implements ApplicationRunner offsets[inputRecord.partition()] = inputRecord.offset(); leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch(); - String[] words = PATTERN.split(inputRecord.value()); + String[] words = splitter.split(inputRecord.value()); for (int i = 0; i < words.length; i++) { ProducerRecord outputRecord = @@ -252,6 +254,11 @@ public class SplitterStreamProcessor implements ApplicationRunner public void stop() { log.info("Shutdown requested..."); + if (stopped) + { + log.warn("Ignoring request: already stopped!"); + return; + } stopped = true; consumer.wakeup(); running.lock();