projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (from parent 1:
f5374eb
)
fix: `ConsumerTaskRunner` waits until the data-loading is finished
author
Kai Moritz
<kai@juplo.de>
Fri, 22 Sep 2023 16:20:31 +0000
(18:20 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 27 Jan 2024 14:19:30 +0000
(15:19 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
index
c860003
..
983ebd3
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskRunner.java
@@
-10,6
+10,7
@@
public class ConsumerTaskRunner
{
private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
{
private final ConsumerTaskExecutor infoChannelConsumerTaskExecutor;
private final ConsumerTaskExecutor dataChannelConsumerTaskExecutor;
+ private final InfoChannel infoChannel;
public void executeConsumerTasks()
{
public void executeConsumerTasks()
{
@@
-17,9
+18,14
@@
public class ConsumerTaskRunner
dataChannelConsumerTaskExecutor.executeConsumerTask();
}
dataChannelConsumerTaskExecutor.executeConsumerTask();
}
- public void joinConsumerTasks()
+ public void joinConsumerTasks()
throws InterruptedException
{
dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
{
dataChannelConsumerTaskExecutor.joinConsumerTaskJob();
+ while (infoChannel.loadInProgress())
+ {
+ log.info("Waiting for {} to finish loading...", infoChannel);
+ Thread.sleep(1000);
+ }
infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
}
}
infoChannelConsumerTaskExecutor.joinConsumerTaskJob();
}
}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
index
722508b
..
44f411f
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesApplicationRunner.java
@@
-28,7
+28,7
@@
public class KafkaServicesApplicationRunner implements ApplicationRunner
}
@PreDestroy
}
@PreDestroy
- public void joinConsumerTasks()
+ public void joinConsumerTasks()
throws InterruptedException
{
consumerTaskRunner.joinConsumerTasks();
}
{
consumerTaskRunner.joinConsumerTasks();
}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
index
7795516
..
cafc775
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
@@
-39,11
+39,13
@@
public class KafkaServicesConfiguration
@Bean
ConsumerTaskRunner consumerTaskRunner(
ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
@Bean
ConsumerTaskRunner consumerTaskRunner(
ConsumerTaskExecutor infoChannelConsumerTaskExecutor,
- ConsumerTaskExecutor dataChannelConsumerTaskExecutor)
+ ConsumerTaskExecutor dataChannelConsumerTaskExecutor,
+ InfoChannel infoChannel)
{
return new ConsumerTaskRunner(
infoChannelConsumerTaskExecutor,
{
return new ConsumerTaskRunner(
infoChannelConsumerTaskExecutor,
- dataChannelConsumerTaskExecutor);
+ dataChannelConsumerTaskExecutor,
+ infoChannel);
}
@Bean
}
@Bean
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
index
d9ed8eb
..
e01e012
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
@@
-53,7
+53,9
@@
class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
}
@AfterAll
}
@AfterAll
- static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+ static void joinConsumerTasks(
+ @Autowired ConsumerTaskRunner consumerTaskRunner)
+ throws InterruptedException
{
KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
}
{
KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
}
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
index
394ba1b
..
e345a75
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
@@
-55,7
+55,9
@@
public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
}
@AfterAll
}
@AfterAll
- static void joinConsumerTasks(@Autowired ConsumerTaskRunner consumerTaskRunner)
+ static void joinConsumerTasks(
+ @Autowired ConsumerTaskRunner consumerTaskRunner)
+ throws InterruptedException
{
KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
}
{
KafkaTestUtils.joinConsumerTasks(consumerTaskRunner);
}
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
index
c616310
..
956d7ce
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaTestUtils.java
@@
-77,7
+77,7
@@
public class KafkaTestUtils
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
new TopicPartition(result.getRecordMetadata().topic(), result.getRecordMetadata().partition()));
}
- public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
+ public static void joinConsumerTasks(ConsumerTaskRunner consumerTaskRunner)
throws InterruptedException
{
consumerTaskRunner.joinConsumerTasks();
}
{
consumerTaskRunner.joinConsumerTasks();
}