import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
@Component
@Slf4j
-public class SplitterStreamProcessor implements ApplicationRunner
+public class SplitterStreamProcessor implements Runnable
{
final static Pattern PATTERN = Pattern.compile("\\W+");
public SplitterStreamProcessor(
SplitterApplicationProperties properties,
- Clock clock)
+ Clock clock,
+ TaskExecutor executor)
{
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producer = new KafkaProducer<>(props);
+
+ executor.execute(this);
}
- @Override
- public void run(ApplicationArguments args)
+ public void run()
{
running.lock();