From: Kai Moritz Date: Tue, 28 Feb 2023 17:54:37 +0000 (+0100) Subject: WIP X-Git-Tag: kafkadata~31 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=220a778c91468046054fac0400ba89825c46b3f5;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java new file mode 100644 index 00000000..b6f5e426 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeConsumer.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Consumer; + + +@RequiredArgsConstructor +@Slf4j +public class ChatHomeConsumer extends Run +{ + private final Consumer consumer; + +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java index eadd7629..a95df543 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -12,11 +12,13 @@ import reactor.core.publisher.Mono; import java.time.ZoneId; import java.util.*; +import java.util.concurrent.ExecutorService; @Slf4j public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener { + private final ExecutorService executorService; private final Consumer consumer; private final Producer producer; private final String topic; @@ -27,6 +29,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL public KafkaChatHomeService( + ExecutorService executorService, Consumer consumer, Producer producer, String topic, @@ -34,6 +37,7 @@ public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceL int numShards) { log.debug("Creating KafkaChatHomeService"); + this.executorService = executorService; this.consumer = consumer; this.producer = producer; this.topic = topic;