X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=64d5176a3fff1428540a182b548d6a34b995e218;hb=ae796c9e9e4ec5c86853aa9b100d0cfe4deeda3c;hp=dac22a2d143c7cda8ed5a2f831eae692455327d8;hpb=ea0b9e7cdc4a32e317b4de44bc53ec46ea03c39f;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index dac22a2..64d5176 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -6,7 +6,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; -import org.springframework.scheduling.annotation.Async; +import org.springframework.core.task.TaskExecutor; import java.time.Duration; import java.util.Arrays; @@ -14,8 +14,9 @@ import java.util.Arrays; @Slf4j @RequiredArgsConstructor -public class SimpleConsumer +public class SimpleConsumer implements Runnable { + private final TaskExecutor taskExecutor; private final String id; private final String topic; private final Consumer consumer; @@ -23,7 +24,12 @@ public class SimpleConsumer private long consumed = 0; - @Async + public void start() + { + taskExecutor.execute(this); + } + + @Override public void run() { try