projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
f3d5588
)
refactor: Extracted interface `WorkAssignor` into separate file
author
Kai Moritz
<kai@juplo.de>
Fri, 15 Sep 2023 08:48:28 +0000
(10:48 +0200)
committer
Kai Moritz
<kai@juplo.de>
Sat, 27 Jan 2024 14:16:23 +0000
(15:16 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
patch
|
blob
|
history
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/WorkAssignor.java
[new file with mode: 0644]
patch
|
blob
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
patch
|
blob
|
history
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
patch
|
blob
|
history
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
index
881dd29
..
9425bdf
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ConsumerTaskExecutor.java
@@
-44,10
+44,4
@@
public class ConsumerTaskExecutor
consumerTaskJob.join();
log.info("Joined the consumer-task for {}", consumerTask);
}
consumerTaskJob.join();
log.info("Joined the consumer-task for {}", consumerTask);
}
-
-
- public interface WorkAssignor
- {
- void assignWork(Consumer<?, ?> consumer);
- }
}
}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
index
5bde07c
..
b5bac47
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java
@@
-51,7
+51,7
@@
public class KafkaServicesConfiguration
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
ThreadPoolTaskExecutor taskExecutor,
InfoChannel infoChannel,
Consumer<String, AbstractMessageTo> infoChannelConsumer,
-
ConsumerTaskExecutor.
WorkAssignor infoChannelWorkAssignor)
+ WorkAssignor infoChannelWorkAssignor)
{
return new ConsumerTaskExecutor(
taskExecutor,
{
return new ConsumerTaskExecutor(
taskExecutor,
@@
-61,8
+61,7
@@
public class KafkaServicesConfiguration
}
@Bean
}
@Bean
- ConsumerTaskExecutor.WorkAssignor infoChannelWorkAssignor(
- ChatBackendProperties properties)
+ WorkAssignor infoChannelWorkAssignor(ChatBackendProperties properties)
{
return consumer ->
{
{
return consumer ->
{
@@
-82,7
+81,7
@@
public class KafkaServicesConfiguration
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
ThreadPoolTaskExecutor taskExecutor,
DataChannel dataChannel,
Consumer<String, AbstractMessageTo> dataChannelConsumer,
-
ConsumerTaskExecutor.
WorkAssignor dataChannelWorkAssignor)
+ WorkAssignor dataChannelWorkAssignor)
{
return new ConsumerTaskExecutor(
taskExecutor,
{
return new ConsumerTaskExecutor(
taskExecutor,
@@
-92,7
+91,7
@@
public class KafkaServicesConfiguration
}
@Bean
}
@Bean
-
ConsumerTaskExecutor.
WorkAssignor dataChannelWorkAssignor(
+ WorkAssignor dataChannelWorkAssignor(
ChatBackendProperties properties,
DataChannel dataChannel)
{
ChatBackendProperties properties,
DataChannel dataChannel)
{
diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/WorkAssignor.java
b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/WorkAssignor.java
new file mode 100644
(file)
index 0000000..
2335e53
--- /dev/null
+++ b/
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/WorkAssignor.java
@@ -0,0
+1,9
@@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+
+
+public interface WorkAssignor
+{
+ void assignWork(Consumer<?, ?> consumer);
+}
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
b/src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
index
8fbc557
..
a3d6075
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/KafkaConfigurationIT.java
@@
-90,7
+90,7
@@
class KafkaConfigurationIT extends AbstractConfigurationWithShardingIT
static class KafkaConfigurationITConfiguration
{
@Bean
static class KafkaConfigurationITConfiguration
{
@Bean
-
ConsumerTaskExecutor.
WorkAssignor dataChannelWorkAssignor(
+ WorkAssignor dataChannelWorkAssignor(
DataChannel dataChannel)
{
return consumer ->
DataChannel dataChannel)
{
return consumer ->
diff --git
a/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
b/src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
index
23c1751
..
434d997
100644
(file)
--- a/
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
+++ b/
src/test/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaChatHomeServiceTest.java
@@
-59,8
+59,7
@@
public class KafkaChatHomeServiceTest extends ChatHomeServiceWithShardsTest
static class KafkaChatHomeTestConfiguration
{
@Bean
static class KafkaChatHomeTestConfiguration
{
@Bean
- ConsumerTaskExecutor.WorkAssignor dataChannelWorkAssignor(
- DataChannel dataChannel)
+ WorkAssignor dataChannelWorkAssignor(DataChannel dataChannel)
{
return consumer ->
{
{
return consumer ->
{