X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FSimpleConsumer.java;h=53bd11205692d125acfbbde85613150cb65411bf;hb=28861eab2d4da8a0594a115de989ffeb90b05cd4;hp=4459a7932755fd5968aead19ec33e313fe97fa22;hpb=1e858c1835d47259583cf3cce18921c550682e2b;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/SimpleConsumer.java b/src/main/java/de/juplo/kafka/SimpleConsumer.java index 4459a79..53bd112 100644 --- a/src/main/java/de/juplo/kafka/SimpleConsumer.java +++ b/src/main/java/de/juplo/kafka/SimpleConsumer.java @@ -6,7 +6,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; -import org.springframework.scheduling.annotation.Async; import java.time.Duration; import java.util.Arrays; @@ -14,24 +13,22 @@ import java.util.Arrays; @Slf4j @RequiredArgsConstructor -public class SimpleConsumer +public class SimpleConsumer implements Runnable { private final String id; private final String topic; private final Consumer consumer; - private volatile boolean running = false; private long consumed = 0; - @Async + @Override public void run() { try { log.info("{} - Subscribing to topic test", id); consumer.subscribe(Arrays.asList(topic)); - running = true; while (true) { @@ -65,15 +62,9 @@ public class SimpleConsumer } finally { - running = false; log.info("{} - Closing the KafkaConsumer", id); consumer.close(); log.info("{}: Consumed {} messages in total, exiting!", id, consumed); } } - - public boolean isRunning() - { - return running; - } }