X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=94f6fa6b2ac9d94e99f534b3a9f2a0f44846f8bd;hb=f5c4bff6a6103d0415c8b2ac9e4ab7517c04c215;hp=bff38ae216066876704e52e73f087c3fc20f5f3b;hpb=d3e648da9f3e10601d94c80740715a20eda851df;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index bff38ae2..94f6fa6b 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -1,8 +1,8 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomInfo; import de.juplo.kafka.chat.backend.domain.Message; -import de.juplo.kafka.chat.backend.persistence.KafkaLikeShardingStrategy; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; @@ -12,16 +12,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.IntStream; @@ -29,15 +25,16 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Consumer consumer; - private final Producer producer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; + private final int bufferSize; + private final Clock clock; private final boolean[] isShardOwned; private final long[] currentOffset; private final long[] nextOffset; private final Map[] chatrooms; - private final KafkaLikeShardingStrategy shardingStrategy; private boolean running; @Getter @@ -46,10 +43,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Consumer consumer, - Producer producer, + Producer producer, + Consumer consumer, ZoneId zoneId, - int numShards) + int numShards, + int bufferSize, + Clock clock) { log.debug( "Creating ChatMessageChannel for topic {} with {} partitions", @@ -60,32 +59,71 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener this.producer = producer; this.zoneId = zoneId; this.numShards = numShards; + this.bufferSize = bufferSize; + this.clock = clock; this.isShardOwned = new boolean[numShards]; this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; this.chatrooms = new Map[numShards]; - this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); } - Mono sendMessage( + + Mono sendCreateChatRoomRequest( + UUID chatRoomId, + String name) + { + CreateChatRoomRequestTo createChatRoomRequestTo = CreateChatRoomRequestTo.of(name); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + topic, + chatRoomId.toString(), + createChatRoomRequestTo); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + log.info("Successfully send chreate-request for chat room: {}", createChatRoomRequestTo); + ChatRoomInfo chatRoomInfo = ChatRoomInfo.of(chatRoomId, name, record.partition()); + createChatRoom(chatRoomInfo); + sink.success(chatRoomInfo); + } + else + { + // On send-failure + log.error( + "Could not send create-request for chat room (id={}, name={}): {}", + chatRoomId, + name, + exception); + sink.error(exception); + } + })); + }); + } + + Mono sendChatMessage( UUID chatRoomId, Message.MessageKey key, LocalDateTime timestamp, String text) { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(topic, shard); ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( - tp.topic(), - tp.partition(), + topic, + null, zdt.toEpochSecond(), chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); + ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -158,7 +196,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener @Override public void run() { - consumer.subscribe(List.of(topic)); + consumer.subscribe(List.of(topic), this); running = true; @@ -166,12 +204,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadMessages(records); + loadChatRoom(records); if (isLoadingCompleted()) { @@ -195,45 +233,94 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener running = false; } } + + log.info("Exiting normally"); } - void loadMessages(ConsumerRecords records) + void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - nextOffset[record.partition()] = record.offset() + 1; UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); - - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); - - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); - - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - if (chatRoom == null) + switch (record.value().getType()) { - // Alles pausieren und erst von putChatRoom wieder resumen lassen! + case CREATE_CHATROOM_REQUEST: + createChatRoom( + chatRoomId, + (CreateChatRoomRequestTo) record.value(), + record.partition()); + break; + + case MESSAGE_SENT: + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageTo) record.value(), + record.partition()); + break; + + default: + log.debug( + "Ignoring message for chat-room {} with offset {}: {}", + chatRoomId, + record.offset(), + record.value()); } - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); - kafkaChatRoomService.persistMessage(message); + nextOffset[record.partition()] = record.offset() + 1; } } + void createChatRoom( + UUID chatRoomId, + CreateChatRoomRequestTo createChatRoomRequestTo, + int partition) + { + putChatRoom(ChatRoomInfo.of( + chatRoomId, + createChatRoomRequestTo.getName(), + partition)); + } + + + void createChatRoom(ChatRoomInfo chatRoomInfo) + { + UUID id = chatRoomInfo.getId(); + String name = chatRoomInfo.getName(); + int shard = chatRoomInfo.getShard(); + log.info("Creating ChatRoom {} with buffer-size {}", id, bufferSize); + KafkaChatRoomService service = new KafkaChatRoomService(this, id); + ChatRoom chatRoom = new ChatRoom(id, name, shard, clock, service, bufferSize); + putChatRoom(chatRoom); + } + + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); + + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); + + kafkaChatRoomService.persistMessage(message); + } + boolean isLoadingCompleted() { return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) - .collect( - () -> Boolean.TRUE, // TODO: Boolean is immutable - (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable - (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } void pauseAllOwnedPartions() @@ -246,7 +333,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } - void putChatRoom(ChatRoom chatRoom) + private void putChatRoom(ChatRoom chatRoom) { Integer partition = chatRoom.getShard(); UUID chatRoomId = chatRoom.getId(); @@ -269,9 +356,4 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { return Mono.justOrEmpty(chatrooms[shard].get(id)); } - - Flux getChatRooms(int shard) - { - return Flux.fromStream(chatrooms[shard].values().stream()); - } }