projects
/
demos
/
kafka
/
training
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
ae796c9
)
WIP:sleep
author
Kai Moritz
<kai@juplo.de>
Fri, 4 Nov 2022 09:53:36 +0000
(10:53 +0100)
committer
Kai Moritz
<kai@juplo.de>
Fri, 4 Nov 2022 09:53:36 +0000
(10:53 +0100)
src/main/java/de/juplo/kafka/Application.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/SimpleConsumer.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/Application.java
b/src/main/java/de/juplo/kafka/Application.java
index
3157ef6
..
718676b
100644
(file)
--- a/
src/main/java/de/juplo/kafka/Application.java
+++ b/
src/main/java/de/juplo/kafka/Application.java
@@
-19,7
+19,7
@@
import java.util.concurrent.ExecutionException;
public class Application implements ApplicationRunner
{
@Autowired
public class Application implements ApplicationRunner
{
@Autowired
- Consumer<?, ?>
c
onsumer;
+ Consumer<?, ?>
kafkaC
onsumer;
@Autowired
SimpleConsumer simpleConsumer;
@Autowired
SimpleConsumer simpleConsumer;
@@
-27,14
+27,25
@@
public class Application implements ApplicationRunner
public void run(ApplicationArguments args) throws Exception
{
log.info("Starting SimpleConsumer");
public void run(ApplicationArguments args) throws Exception
{
log.info("Starting SimpleConsumer");
- simpleConsumer.
start
();
+ simpleConsumer.
run
();
}
@PreDestroy
public void stop() throws ExecutionException, InterruptedException
{
log.info("Signaling SimpleConsumer to quit its work");
}
@PreDestroy
public void stop() throws ExecutionException, InterruptedException
{
log.info("Signaling SimpleConsumer to quit its work");
- consumer.wakeup();
+ kafkaConsumer.wakeup();
+
+ while (simpleConsumer.isRunning())
+ {
+ log.info("Waiting for SimpleConsumer to finish its work");
+ try
+ {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {}
+ }
+ log.info("SimpleConsumer finished its work");
}
}
diff --git
a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index
648bb9d
..
de77c60
100644
(file)
--- a/
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@
-6,7
+6,6
@@
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.core.task.TaskExecutor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;
@@
-16,14
+15,12
@@
public class ApplicationConfiguration
{
@Bean
public SimpleConsumer endlessConsumer(
{
@Bean
public SimpleConsumer endlessConsumer(
- TaskExecutor taskExecutor,
Consumer<String, String> kafkaConsumer,
KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
return
new SimpleConsumer(
Consumer<String, String> kafkaConsumer,
KafkaProperties kafkaProperties,
ApplicationProperties applicationProperties)
{
return
new SimpleConsumer(
- taskExecutor,
kafkaProperties.getClientId(),
applicationProperties.getTopic(),
kafkaConsumer);
kafkaProperties.getClientId(),
applicationProperties.getTopic(),
kafkaConsumer);
diff --git
a/src/main/java/de/juplo/kafka/SimpleConsumer.java
b/src/main/java/de/juplo/kafka/SimpleConsumer.java
index
64d5176
..
4459a79
100644
(file)
--- a/
src/main/java/de/juplo/kafka/SimpleConsumer.java
+++ b/
src/main/java/de/juplo/kafka/SimpleConsumer.java
@@
-6,7
+6,7
@@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
-import org.springframework.
core.task.TaskExecutor
;
+import org.springframework.
scheduling.annotation.Async
;
import java.time.Duration;
import java.util.Arrays;
import java.time.Duration;
import java.util.Arrays;
@@
-14,28
+14,24
@@
import java.util.Arrays;
@Slf4j
@RequiredArgsConstructor
@Slf4j
@RequiredArgsConstructor
-public class SimpleConsumer
implements Runnable
+public class SimpleConsumer
{
{
- private final TaskExecutor taskExecutor;
private final String id;
private final String topic;
private final Consumer<String, String> consumer;
private final String id;
private final String topic;
private final Consumer<String, String> consumer;
+ private volatile boolean running = false;
private long consumed = 0;
private long consumed = 0;
- public void start()
- {
- taskExecutor.execute(this);
- }
-
- @Override
+ @Async
public void run()
{
try
{
log.info("{} - Subscribing to topic test", id);
consumer.subscribe(Arrays.asList(topic));
public void run()
{
try
{
log.info("{} - Subscribing to topic test", id);
consumer.subscribe(Arrays.asList(topic));
+ running = true;
while (true)
{
while (true)
{
@@
-69,9
+65,15
@@
public class SimpleConsumer implements Runnable
}
finally
{
}
finally
{
+ running = false;
log.info("{} - Closing the KafkaConsumer", id);
consumer.close();
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}
log.info("{} - Closing the KafkaConsumer", id);
consumer.close();
log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
}
}
+
+ public boolean isRunning()
+ {
+ return running;
+ }
}
}