import java.time.*;
import java.util.*;
-import java.util.concurrent.ExecutorService;
import java.util.stream.IntStream;
@Slf4j
public class KafkaChatHomeService implements ChatHomeService, Runnable, ConsumerRebalanceListener
{
- private final ExecutorService executorService;
- private final Consumer<String, MessageTo> consumer;
- private final Producer<String, MessageTo> producer;
- private final String topic;
+ private final String chatRoomsTopic;
+ private final Consumer<Integer, ChatRoomTo> chatRoomsConsumer;
+ private final Producer<Integer, ChatRoomTo> chatRoomsProducer;
+ private final String chatMessagesTopic;
+ private final Consumer<String, MessageTo> chatMessagesConsumer;
+ private final Producer<String, MessageTo> chatMessagesProducer;
private final ZoneId zoneId;
private final int numShards;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
- private final Map<UUID, ChatRoom>[] chatRoomMaps;
+ private final Map<UUID, ChatRoom>[] chatrooms;
private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
public KafkaChatHomeService(
- ExecutorService executorService,
- Consumer<String, MessageTo> consumer,
- Producer<String, MessageTo> producer,
- String topic,
+ String chatRoomsTopic,
+ Consumer<Integer, ChatRoomTo> chatRoomsConsumer,
+ Producer<Integer, ChatRoomTo> chatRoomsProducer,
+ String chatMessagesTopic,
+ Consumer<String, MessageTo> chatMessagesConsumer,
+ Producer<String, MessageTo> chatMessagesProducer,
ZoneId zoneId,
int numShards)
{
log.debug("Creating KafkaChatHomeService");
- this.executorService = executorService;
- this.consumer = consumer;
- this.producer = producer;
- this.topic = topic;
+ this.chatRoomsTopic = chatRoomsTopic;
+ this.chatRoomsConsumer = chatRoomsConsumer;
+ this.chatRoomsProducer = chatRoomsProducer;
+ this.chatMessagesTopic = chatMessagesTopic;
+ this.chatMessagesConsumer = chatMessagesConsumer;
+ this.chatMessagesProducer = chatMessagesProducer;
this.zoneId = zoneId;
this.numShards = numShards;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
- this.chatRoomMaps = new Map[numShards];
+ this.chatrooms = new Map[numShards];
this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
log.info("Newly assigned partitions! Pausing normal operations...");
loadInProgress = true;
- consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+ chatMessagesConsumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
{
int partition = topicPartition.partition();
isShardOwned[partition] = true;
nextOffset[partition],
currentOffset);
- consumer.seek(topicPartition, nextOffset[partition]);
+ chatMessagesConsumer.seek(topicPartition, nextOffset[partition]);
});
- consumer.resume(partitions);
+ chatMessagesConsumer.resume(partitions);
}
@Override
@Override
public void run()
{
- consumer.subscribe(List.of(topic));
+ chatMessagesConsumer.subscribe(List.of(chatMessagesTopic));
running = true;
{
try
{
- ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, MessageTo> records = chatMessagesConsumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
- ChatRoom chatRoom = chatRoomMaps[record.partition()].get(chatRoomId);
+ ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
KafkaChatRoomService kafkaChatRoomService =
(KafkaChatRoomService) chatRoom.getChatRoomService();
void pauseAllOwnedPartions()
{
- consumer.pause(IntStream
+ chatMessagesConsumer.pause(IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(topic, shard))
+ .mapToObj(shard -> new TopicPartition(chatMessagesTopic, shard))
.toList());
}
String text)
{
int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
+ TopicPartition tp = new TopicPartition(chatMessagesTopic, shard);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
chatRoomId.toString(),
MessageTo.of(key.getUsername(), key.getMessageId(), text));
- producer.send(record, ((metadata, exception) ->
+ chatMessagesProducer.send(record, ((metadata, exception) ->
{
if (metadata != null)
{
}
+ public void putChatRoom(ChatRoom chatRoom)
+ {
+
+ ProducerRecord<Integer, ChatRoomTo> record = new ProducerRecord<>(chatRoom.getShard(), );
+ // TODO: Nachricht senden!
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ }
+
@Override
public Mono<ChatRoom> getChatRoom(int shard, UUID id)
{
}
else
{
- return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
}
}
}
else
{
- return Flux.fromStream(chatRoomMaps[shard].values().stream());
+ return Flux.fromStream(chatrooms[shard].values().stream());
}
}
}