import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Component
@Slf4j
-public class SplitterStreamProcessor implements ApplicationRunner
+public class SplitterStreamProcessor implements Runnable
{
- final static Pattern PATTERN = Pattern.compile("\\W+");
-
+ private final MessageSplitter splitter;
private final String inputTopic;
private final String outputTopic;
private final KafkaConsumer<String, String> consumer;
private long lastCommit;
public SplitterStreamProcessor(
+ MessageSplitter splitter,
SplitterApplicationProperties properties,
- Clock clock)
+ Clock clock,
+ TaskExecutor executor)
{
+ this.splitter = splitter;
+
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producer = new KafkaProducer<>(props);
+
+ executor.execute(this);
}
- @Override
- public void run(ApplicationArguments args)
+ public void run()
{
running.lock();
offsets[inputRecord.partition()] = inputRecord.offset();
leaderEpochs[inputRecord.partition()] = inputRecord.leaderEpoch();
- String[] words = PATTERN.split(inputRecord.value());
+ String[] words = splitter.split(inputRecord.value());
for (int i = 0; i < words.length; i++)
{
ProducerRecord<String, String> outputRecord =
public void stop()
{
log.info("Shutdown requested...");
+ if (stopped)
+ {
+ log.warn("Ignoring request: already stopped!");
+ return;
+ }
stopped = true;
consumer.wakeup();
running.lock();