X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=8a53d3c9fd0ef62781bc745cf706bad9f76b79b3;hb=28a98bfdbed0cf56697bece5efbe6eb52f331611;hp=8294316f497157fd0067054e45bfb9a9bb620595;hpb=1416ccc8a9eae999201dbf7c77c4d4906fc9fc24;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 8294316f..8a53d3c9 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -1,6 +1,7 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.Getter; @@ -67,7 +68,44 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } - Mono sendMessage( + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, LocalDateTime timestamp, @@ -202,14 +240,18 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { for (ConsumerRecord record : records) { + UUID chatRoomId = UUID.fromString(record.key()); + switch (record.value().getType()) { case CREATE_CHATROOM_REQUEST: - createChatRoom((CreateChatRoomRequestTo) record.value()); + createChatRoom( + chatRoomId, + (CreateChatRoomRequestTo) record.value(), + record.partition()); break; case MESSAGE_SENT: - UUID chatRoomId = UUID.fromString(record.key()); Instant instant = Instant.ofEpochSecond(record.timestamp()); LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); loadChatMessage( @@ -226,10 +268,26 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } void createChatRoom( + UUID chatRoomId, CreateChatRoomRequestTo createChatRoomRequestTo, int partition) { - chatrooms[partition].put + putChatRoom(ChatRoomInfo.of( + chatRoomId, + createChatRoomRequestTo.getName(), + partition)); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); } void loadChatMessage( @@ -267,7 +325,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } - void putChatRoom(ChatRoom chatRoom) + private void putChatRoom(ChatRoom chatRoom) { Integer partition = chatRoom.getShard(); UUID chatRoomId = chatRoom.getId();