package de.juplo.kafka.chat.backend.persistence.kafka;
import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomInfo;
import de.juplo.kafka.chat.backend.domain.Message;
-import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener
{
private final String topic;
- private final Producer<String, MessageTo> producer;
- private final Consumer<String, MessageTo> consumer;
+ private final Producer<String, AbstractTo> producer;
+ private final Consumer<String, AbstractTo> consumer;
private final ZoneId zoneId;
private final int numShards;
+ private final int bufferSize;
+ private final Clock clock;
private final boolean[] isShardOwned;
private final long[] currentOffset;
private final long[] nextOffset;
private final Map<UUID, ChatRoom>[] chatrooms;
- private final KafkaLikeShardingStrategy shardingStrategy;
private boolean running;
@Getter
public ChatMessageChannel(
String topic,
- Producer<String, MessageTo> producer,
- Consumer<String, MessageTo> consumer,
+ Producer<String, AbstractTo> producer,
+ Consumer<String, AbstractTo> consumer,
ZoneId zoneId,
- int numShards)
+ int numShards,
+ int bufferSize,
+ Clock clock)
{
log.debug(
"Creating ChatMessageChannel for topic {} with {} partitions",
this.producer = producer;
this.zoneId = zoneId;
this.numShards = numShards;
+ this.bufferSize = bufferSize;
+ this.clock = clock;
this.isShardOwned = new boolean[numShards];
this.currentOffset = new long[numShards];
this.nextOffset = new long[numShards];
IntStream
.range(0, numShards)
.forEach(shard -> this.chatrooms[shard] = new HashMap<>());
- this.shardingStrategy = new KafkaLikeShardingStrategy(numShards);
}
- Mono<Message> sendMessage(
+
+ Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ UUID chatRoomId,
+ String name)
+ {
+ CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name);
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, CreateChatRoomRequestTo> record =
+ new ProducerRecord<>(
+ topic,
+ chatRoomId.toString(),
+ createChatRoomRequestTo);
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo);
+ ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition());
+ createChatRoom(chatRoomInfo);
+ sink.success(chatRoomInfo);
+ }
+ else
+ {
+ // On send-failure
+ log.error(
+ "Could not send create-request for chat room (id={}, name={}): {}",
+ chatRoomId,
+ name,
+ exception);
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+
+ Mono<Message> sendChatMessage(
UUID chatRoomId,
Message.MessageKey key,
LocalDateTime timestamp,
String text)
{
- int shard = this.shardingStrategy.selectShard(chatRoomId);
- TopicPartition tp = new TopicPartition(topic, shard);
ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId);
return Mono.create(sink ->
{
- ProducerRecord<String, MessageTo> record =
+ ProducerRecord<String, AbstractTo> record =
new ProducerRecord<>(
- tp.topic(),
- tp.partition(),
+ topic,
+ null,
zdt.toEpochSecond(),
chatRoomId.toString(),
- MessageTo.of(key.getUsername(), key.getMessageId(), text));
+ ChatMessageTo.of(key.getUsername(), key.getMessageId(), text));
producer.send(record, ((metadata, exception) ->
{
{
try
{
- ConsumerRecords<String, MessageTo> records = consumer.poll(Duration.ofMinutes(5));
+ ConsumerRecords<String, AbstractTo> records = consumer.poll(Duration.ofMinutes(5));
log.info("Fetched {} messages", records.count());
if (loadInProgress)
{
- loadMessages(records);
+ loadChatRoom(records);
if (isLoadingCompleted())
{
log.info("Exiting normally");
}
- void loadMessages(ConsumerRecords<String, MessageTo> records)
+ void loadChatRoom(ConsumerRecords<String, AbstractTo> records)
{
- for (ConsumerRecord<String, MessageTo> record : records)
+ for (ConsumerRecord<String, AbstractTo> record : records)
{
- nextOffset[record.partition()] = record.offset() + 1;
UUID chatRoomId = UUID.fromString(record.key());
- MessageTo messageTo = record.value();
-
- Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId());
- Instant instant = Instant.ofEpochSecond(record.timestamp());
- LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
-
- Message message = new Message(key, record.offset(), timestamp, messageTo.getText());
-
- ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId);
- if (chatRoom == null)
+ switch (record.value().getType())
{
- // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen!
+ case CREATE_CHATROOM_REQUEST:
+ createChatRoom(
+ chatRoomId,
+ (CreateChatRoomRequestTo) record.value(),
+ record.partition());
+ break;
+
+ case MESSAGE_SENT:
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (ChatMessageTo) record.value(),
+ record.partition());
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for chat-room {} with offset {}: {}",
+ chatRoomId,
+ record.offset(),
+ record.value());
}
- KafkaChatRoomService kafkaChatRoomService =
- (KafkaChatRoomService) chatRoom.getChatRoomService();
- kafkaChatRoomService.persistMessage(message);
+ nextOffset[record.partition()] = record.offset() + 1;
}
}
+ void createChatRoom(
+ UUID chatRoomId,
+ CreateChatRoomRequestTo createChatRoomRequestTo,
+ int partition)
+ {
+ putChatRoom(ChatRoomInfo.of(
+ chatRoomId,
+ createChatRoomRequestTo.getName(),
+ partition));
+ }
+
+
+ void createChatRoom(ChatRoomInfo chatRoomInfo)
+ {
+ UUID id = chatRoomInfo.getId();
+ String name = chatRoomInfo.getName();
+ int shard = chatRoomInfo.getShard();
+ log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
+ KafkaChatRoomService service = new KafkaChatRoomService(this, id);
+ ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize);
+ putChatRoom(chatRoom);
+ }
+
+ void loadChatMessage(
+ UUID chatRoomId,
+ LocalDateTime timestamp,
+ long offset,
+ ChatMessageTo chatMessageTo,
+ int partition)
+ {
+ Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId());
+ Message message = new Message(key, offset, timestamp, chatMessageTo.getText());
+
+ ChatRoom chatRoom = chatrooms[partition].get(chatRoomId);
+ KafkaChatRoomService kafkaChatRoomService =
+ (KafkaChatRoomService) chatRoom.getChatRoomService();
+
+ kafkaChatRoomService.persistMessage(message);
+ }
+
boolean isLoadingCompleted()
{
return IntStream
}
- void putChatRoom(ChatRoom chatRoom)
+ private void putChatRoom(ChatRoom chatRoom)
{
Integer partition = chatRoom.getShard();
UUID chatRoomId = chatRoom.getId();
{
return Mono.justOrEmpty(chatrooms[shard].get(id));
}
-
- Flux<ChatRoom> getChatRooms()
- {
- return Flux.fromStream(IntStream
- .range(0, numShards)
- .filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> Integer.valueOf(shard))
- .flatMap(shard -> chatrooms[shard].values().stream()));
- }
}