X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatHomeService.java;h=a95df543813f3223895b631d2b40e334309c9c1c;hb=220a778c91468046054fac0400ba89825c46b3f5;hp=eadd7629407438224af062f393a95a37a8e0defe;hpb=2aeb6169a2f362dd342850c638cccb15bf353965;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index eadd7629..a95df543 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -12,11 +12,13 @@ import reactor.core.publisher.Mono; import java.time.ZoneId; import java.util.*; +import java.util.concurrent.ExecutorService; @Slf4j public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener { + private final ExecutorService executorService; private final Consumer consumer; private final Producer producer; private final String topic; @@ -27,6 +29,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL public KafkaChatHomeService( + ExecutorService executorService, Consumer consumer, Producer producer, String topic, @@ -34,6 +37,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL int numShards) { log.debug("Creating KafkaChatHomeService"); + this.executorService = executorService; this.consumer = consumer; this.producer = producer; this.topic = topic;