import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.task.TaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import java.time.Clock;
import java.time.Duration;
import java.util.*;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@Component
@Slf4j
-public class SplitterStreamProcessor implements ApplicationRunner
+public class SplitterStreamProcessor implements Runnable
{
final static Pattern PATTERN = Pattern.compile("\\W+");
public SplitterStreamProcessor(
SplitterApplicationProperties properties,
- Clock clock)
+ Clock clock,
+ TaskExecutor executor)
{
this.inputTopic = properties.getInputTopic();
this.outputTopic = properties.getOutputTopic();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
producer = new KafkaProducer<>(props);
+
+ executor.execute(this);
}
- @Override
- public void run(ApplicationArguments args)
+ public void run()
{
running.lock();
package de.juplo.kafka.wordcount.splitter;
import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.test.context.EmbeddedKafka;
-@SpringBootTest
+import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_IN;
+import static de.juplo.kafka.wordcount.splitter.ApplicationTests.TOPIC_OUT;
+
+
+@SpringBootTest(
+ properties = {
+ "juplo.wordcount.splitter.input-topic=" + TOPIC_IN,
+ "juplo.wordcount.splitter.outtput-topic=" + TOPIC_OUT,
+ })
+@EmbeddedKafka(
+ bootstrapServersProperty = "juplo.wordcount.splitter.bootstrap-server",
+ topics = { TOPIC_IN, TOPIC_OUT },
+ brokerProperties = {
+ "transaction.state.log.replication.factor=1",
+ "transaction.state.log.min.isr=1",
+ })
class ApplicationTests
{
+ final static String TOPIC_IN = "in";
+ final static String TOPIC_OUT = "out";
+
+ @Autowired
+ SplitterStreamProcessor splitter;
+
@Test
void contextLoads()
{
+ splitter.stop();
}
}