projects
/
demos
/
kafka
/
chat
/ commitdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
| commitdiff |
tree
raw
|
patch
|
inline
| side by side (parent:
2aeb616
)
WIP
author
Kai Moritz
<kai@juplo.de>
Tue, 28 Feb 2023 17:54:37 +0000
(18:54 +0100)
committer
Kai Moritz
<kai@juplo.de>
Tue, 28 Feb 2023 18:23:01 +0000
(19:23 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java
[new file with mode: 0644]
patch
|
blob
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
patch
|
blob
|
history
diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java
new file mode 100644
(file)
index 0000000..
b6f5e42
--- /dev/null
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java
@@ -0,0
+1,15
@@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChatHomeConsumer extends Run
+{
+ private final Consumer<String, MessageTo> consumer;
+
+}
diff --git
a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
index
eadd762
..
a95df54
100644
(file)
--- a/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
+++ b/
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java
@@
-12,11
+12,13
@@
import reactor.core.publisher.Mono;
import java.time.ZoneId;
import java.util.*;
import java.time.ZoneId;
import java.util.*;
+import java.util.concurrent.ExecutorService;
@Slf4j
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
@Slf4j
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
+ private final ExecutorService executorService;
private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
private final String topic;
private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
private final String topic;
@@
-27,6
+29,7
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
public KafkaChatHomeService(
public KafkaChatHomeService(
+ ExecutorService executorService,
Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
String topic,
Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
String topic,
@@
-34,6
+37,7
@@
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
int numShards)
{
log.debug("Creating KafkaChatHomeService");
int numShards)
{
log.debug("Creating KafkaChatHomeService");
+ this.executorService = executorService;
this.consumer = consumer;
this.producer = producer;
this.topic = topic;
this.consumer = consumer;
this.producer = producer;
this.topic = topic;