import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import java.time.ZoneId;
import java.util.*;
public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
{
private final Consumer<String, MessageTo> consumer;
+ private final Producer<String, MessageTo> producer;
private final String topic;
+ private final ZoneId zoneId;
// private final long[] offsets; Erst mal immer alles neu einlesen
- private final Map<UUID, KafkaChatRoomService>[] kafkaChatRoomServiceMaps;
+ private final ChatHomeLoader[] chatHomeLoaders;
private final Map<UUID, ChatRoom>[] chatRoomMaps;
public KafkaChatHomeService(
Consumer<String, MessageTo> consumer,
+ Producer<String, MessageTo> producer,
String topic,
+ ZoneId zoneId,
int numShards)
{
log.debug("Creating KafkaChatHomeService");
this.consumer = consumer;
+ this.producer = producer;
this.topic = topic;
+ this.zoneId = zoneId;
// this.offsets = new long[numShards];
// for (int i=0; i< numShards; i++)
// {
// this.offsets[i] = 0l;
// }
- this.kafkaChatRoomServiceMaps = new Map[numShards];
+ this.chatHomeLoaders = new ChatHomeLoader[numShards];
this.chatRoomMaps = new Map[numShards];
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
{
- consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
+ consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
- if (!tp.topic().equals(topic))
+ if (!topicPartition.topic().equals(topic))
{
- log.warn("Ignoring partition from unwanted topic: {}", tp);
+ log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
return;
}
- int partition = tp.partition();
- kafkaChatRoomServiceMaps[partition] = new HashMap<>(); // TODO: reuse! Nicht immer alles neu laden
+ int partition = topicPartition.partition();
long unseenOffset = 0; // offsets[partition];
log.info(
unseenOffset,
currentOffset);
- consumer.seek(tp, unseenOffset);
- chatRoomMaps[partition]
- .values()
- .stream()
- handlers[partition] = new ChatRoomLoadingMessageHandlingStrategy(tp, currentOffset, unseenOffset);
+ // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
+ consumer.seek(topicPartition, unseenOffset);
+ chatHomeLoaders[partition] = new ChatHomeLoader(
+ producer,
+ currentOffset,
+ zoneId);
});
}