X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;h=191428cf9b77cc4dbac6e85271cef07a6dbb6db9;hb=723ec6bbc03aaa4f552fd3a27986c120ea46fccf;hp=791e164d6fe3ddfdf8600dec4430e082462b2733;hpb=89d429ad1ad583fb9d52d51ed37cd5a885a9c8c3;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index 791e164..191428c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -9,8 +9,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.Assert; @@ -26,7 +25,7 @@ import java.util.stream.Collectors; @Component @Slf4j -public class SplitterStreamProcessor implements ApplicationRunner +public class SplitterStreamProcessor implements Runnable { final static Pattern PATTERN = Pattern.compile("\\W+"); @@ -45,7 +44,8 @@ public class SplitterStreamProcessor implements ApplicationRunner public SplitterStreamProcessor( SplitterApplicationProperties properties, - Clock clock) + Clock clock, + TaskExecutor executor) { this.inputTopic = properties.getInputTopic(); this.outputTopic = properties.getOutputTopic(); @@ -72,10 +72,11 @@ public class SplitterStreamProcessor implements ApplicationRunner props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producer = new KafkaProducer<>(props); + + executor.execute(this); } - @Override - public void run(ApplicationArguments args) + public void run() { running.lock();