switch (record.value().getType())
{
- case COMMAND_CREATE_CHATROOM:
- createChatRoom(
- chatRoomId,
- (CommandCreateChatRoomTo) record.value(),
- record.partition());
- break;
-
case EVENT_CHATMESSAGE_RECEIVED:
Instant instant = Instant.ofEpochSecond(record.timestamp());
LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
}
}
- private void createChatRoom(
- UUID chatRoomId,
- CommandCreateChatRoomTo createChatRoomRequestTo,
- Integer partition)
+ void createChatRoom(ChatRoomInfo chatRoomInfo)
{
- log.info(
- "Loading ChatRoom {} for shard {} with buffer-size {}",
- chatRoomId,
- partition,
- bufferSize);
- KafkaChatMessageService service = new KafkaChatMessageService(this, chatRoomId);
- ChatRoomData chatRoomData = new ChatRoomData(
- clock,
- service,
- bufferSize);
- putChatRoom(
- chatRoomId,
- createChatRoomRequestTo.getName(),
- partition,
- chatRoomData);
- }
-
+ if (!isShardOwned[chatRoomInfo.getShard()])
+ {
+ log.debug("Ignoring not owned chat-room {}", chatRoomInfo);
+ return;
+ }
- private void createChatRoom(ChatRoomInfo chatRoomInfo)
- {
UUID id = chatRoomInfo.getId();
log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize);
KafkaChatMessageService service = new KafkaChatMessageService(this, id);
import de.juplo.kafka.chat.backend.implementation.kafka.messages.AbstractMessageTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.CommandCreateChatRoomTo;
import de.juplo.kafka.chat.backend.implementation.kafka.messages.data.EventChatMessageReceivedTo;
+import de.juplo.kafka.chat.backend.implementation.kafka.messages.info.EventChatRoomCreated;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@Slf4j
public class InfoChannel implements Runnable
{
- private final String infoTopic;
- private final String dataTopic;
+ private final String topic;
private final Producer<String, AbstractMessageTo> producer;
private final Consumer<String, AbstractMessageTo> consumer;
private final Map<UUID, ChatRoomInfo> chatRoomInfo;
public InfoChannel(
- String infoTopic,
- String dataTopic,
+ String topic,
Producer<String, AbstractMessageTo> producer,
Consumer<String, AbstractMessageTo> infoChannelConsumer)
{
log.debug(
"Creating InfoChannel for topic {}",
- infoTopic);
- this.infoTopic = infoTopic;
- this.dataTopic = dataTopic;
+ topic);
+ this.topic = topic;
this.consumer = infoChannelConsumer;
this.producer = producer;
this.chatRoomInfo = new HashMap<>();
- Mono<ChatRoomInfo> sendCreateChatRoomRequest(
+ Mono<ChatRoomInfo> sendChatRoomCreatedEvent(
UUID chatRoomId,
String name)
{
- CommandCreateChatRoomTo to = CommandCreateChatRoomTo.of(name);
+ EventChatRoomCreated to = EventChatRoomCreated.of(name);
return Mono.create(sink ->
{
ProducerRecord<String, AbstractMessageTo> record =
new ProducerRecord<>(
- dataTopic,
+ topic<,
chatRoomId.toString(),
to);
log.info("Exiting normally");
}
- private void loadChatRoom(ConsumerRecords<String, AbstractInfoMessageTo> records)
+ private void loadChatRoom(ConsumerRecords<String, AbstractMessageTo> records)
{
- for (ConsumerRecord<String, AbstractInfoMessageTo> record : records)
+ for (ConsumerRecord<String, AbstractMessageTo> record : records)
{
UUID chatRoomId = UUID.fromString(record.key());
switch (record.value().getType())
{
- case COMMAND_CREATE_CHATROOM:
+ case EVENT_CHATROOM_CREATED:
createChatRoom(
chatRoomId,
(CommandCreateChatRoomTo) record.value(),
consumer.pause(IntStream
.range(0, numShards)
.filter(shard -> isShardOwned[shard])
- .mapToObj(shard -> new TopicPartition(infoTopic, shard))
+ .mapToObj(shard -> new TopicPartition(topic, shard))
.toList());
}