WIP
authorKai Moritz <kai@juplo.de>
Tue, 28 Feb 2023 17:54:37 +0000 (18:54 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 28 Feb 2023 18:23:01 +0000 (19:23 +0100)
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java
new file mode 100644 (file)
index 0000000..b6f5e42
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChatHomeConsumer extends Run
+{
+  private final Consumer<String, MessageTo> consumer;
+
+}
index eadd762..a95df54 100644 (file)
@@ -12,11 +12,13 @@ import reactor.core.publisher.Mono;
 
 import java.time.ZoneId;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 
 
 @Slf4j
 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
 {
+  private final ExecutorService executorService;
   private final Consumer<String, MessageTo> consumer;
   private final Producer<String, MessageTo> producer;
   private final String topic;
@@ -27,6 +29,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
 
 
   public KafkaChatHomeService(
+    ExecutorService executorService,
     Consumer<String, MessageTo> consumer,
     Producer<String, MessageTo> producer,
     String topic,
@@ -34,6 +37,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     int numShards)
   {
     log.debug("Creating KafkaChatHomeService");
+    this.executorService = executorService;
     this.consumer = consumer;
     this.producer = producer;
     this.topic = topic;