WIP
[demos/kafka/chat] / src / main / java / de / juplo / kafka / chat / backend / persistence / kafka / KafkaChatHomeService.java
index eadd762..a95df54 100644 (file)
@@ -12,11 +12,13 @@ import reactor.core.publisher.Mono;
 
 import java.time.ZoneId;
 import java.util.*;
+import java.util.concurrent.ExecutorService;
 
 
 @Slf4j
 public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
 {
+  private final ExecutorService executorService;
   private final Consumer<String, MessageTo> consumer;
   private final Producer<String, MessageTo> producer;
   private final String topic;
@@ -27,6 +29,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
 
 
   public KafkaChatHomeService(
+    ExecutorService executorService,
     Consumer<String, MessageTo> consumer,
     Producer<String, MessageTo> producer,
     String topic,
@@ -34,6 +37,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL
     int numShards)
   {
     log.debug("Creating KafkaChatHomeService");
+    this.executorService = executorService;
     this.consumer = consumer;
     this.producer = producer;
     this.topic = topic;