--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.function.Consumer;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class ChatHomeConsumer extends Run
+{
+ private final Consumer<String, MessageTo> consumer;
+
+}
import java.time.ZoneId;
import java.util.*;
+import java.util.concurrent.ExecutorService;
@Slf4j
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
+ private final ExecutorService executorService;
private final Consumer<String, MessageTo> consumer;
private final Producer<String, MessageTo> producer;
private final String topic;
public KafkaChatHomeService(
+ ExecutorService executorService,
Consumer<String, MessageTo> consumer,
Producer<String, MessageTo> producer,
String topic,
int numShards)
{
log.debug("Creating KafkaChatHomeService");
+ this.executorService = executorService;
this.consumer = consumer;
this.producer = producer;
this.topic = topic;