X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=8294316f497157fd0067054e45bfb9a9bb620595;hb=1416ccc8a9eae999201dbf7c77c4d4906fc9fc24;hp=7620461785e0e0ed132971bd1cce3b8c6f363c80;hpb=13c51b4630177e7f6649500a3d4b876a12509af6;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 76204617..8294316f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -12,16 +12,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.*; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.stream.IntStream; @@ -29,8 +25,8 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Consumer consumer; - private final Producer producer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final boolean[] isShardOwned; @@ -46,8 +42,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Consumer consumer, - Producer producer, + Producer producer, + Consumer consumer, ZoneId zoneId, int numShards) { @@ -64,10 +60,57 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener this.currentOffset = new long[numShards]; this.nextOffset = new long[numShards]; this.chatrooms = new Map[numShards]; + IntStream + .range(0, numShards) + .forEach(shard -> this.chatrooms[shard] = new HashMap<>()); this.shardingStrategy = new KafkaLikeShardingStrategy(numShards); } + Mono sendMessage( + UUID chatRoomId, + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + int shard = this.shardingStrategy.selectShard(chatRoomId); + TopicPartition tp = new TopicPartition(topic, shard); + ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + zdt.toEpochSecond(), + chatRoomId.toString(), + ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + Message message = new Message(key, metadata.offset(), timestamp, text); + log.info("Successfully send message {}", message); + sink.success(message); + } + else + { + // On send-failure + log.error( + "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", + chatRoomId, + key, + timestamp, + text, + exception); + sink.error(exception); + } + })); + }); + } + @Override public void onPartitionsAssigned(Collection partitions) { @@ -114,7 +157,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener @Override public void run() { - consumer.subscribe(List.of(topic)); + consumer.subscribe(List.of(topic), this); running = true; @@ -122,12 +165,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadMessages(records); + loadChatRoom(records); if (isLoadingCompleted()) { @@ -147,38 +190,63 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener } catch (WakeupException e) { - } - catch (RecordDeserializationException e) - { + log.info("Received WakeupException, exiting!"); + running = false; } } + + log.info("Exiting normally"); } - void loadMessages(ConsumerRecords records) + void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - nextOffset[record.partition()] = record.offset() + 1; - UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); + switch (record.value().getType()) + { + case CREATE_CHATROOM_REQUEST: + createChatRoom((CreateChatRoomRequestTo) record.value()); + break; + + case MESSAGE_SENT: + UUID chatRoomId = UUID.fromString(record.key()); + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageTo) record.value(), + record.partition()); + break; + } - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + nextOffset[record.partition()] = record.offset() + 1; + } + } - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + void createChatRoom( + CreateChatRoomRequestTo createChatRoomRequestTo, + int partition) + { + chatrooms[partition].put + } - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - if (chatRoom == null) - { - // Alles pausieren und erst von putChatRoom wieder resumen lassen! - } - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); - kafkaChatRoomService.persistMessage(message); - } + kafkaChatRoomService.persistMessage(message); } boolean isLoadingCompleted() @@ -186,11 +254,7 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener return IntStream .range(0, numShards) .filter(shard -> isShardOwned[shard]) - .mapToObj(shard -> nextOffset[shard] >= currentOffset[shard]) - .collect( - () -> Boolean.TRUE, // TODO: Boolean is immutable - (acc, v) -> Boolean.valueOf(acc && v), // TODO: Boolean is immutable - (a, b) -> Boolean.valueOf(a && b)); // TODO: Boolean is immutable + .allMatch(shard -> nextOffset[shard] >= currentOffset[shard]); } void pauseAllOwnedPartions() @@ -202,54 +266,24 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener .toList()); } - Mono sendMessage( - UUID chatRoomId, - Message.MessageKey key, - LocalDateTime timestamp, - String text) - { - int shard = this.shardingStrategy.selectShard(chatRoomId); - TopicPartition tp = new TopicPartition(topic, shard); - ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); - return Mono.create(sink -> - { - ProducerRecord record = - new ProducerRecord<>( - tp.topic(), - tp.partition(), - zdt.toEpochSecond(), - chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); - - producer.send(record, ((metadata, exception) -> - { - if (metadata != null) - { - // On successful send - Message message = new Message(key, metadata.offset(), timestamp, text); - log.info("Successfully send message {}", message); - sink.success(message); - } - else - { - // On send-failure - log.error( - "Could not send message for chat-room={}, key={}, timestamp={}, text={}: {}", - chatRoomId, - key, - timestamp, - text, - exception); - sink.error(exception); - } - })); - }); - } - void putChatRoom(ChatRoom chatRoom) { - chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + Integer partition = chatRoom.getShard(); + UUID chatRoomId = chatRoom.getId(); + if (chatrooms[partition].containsKey(chatRoomId)) + { + log.warn("Ignoring existing chat-room: " + chatRoom); + } + else + { + log.info( + "Adding new chat-room to partition {}: {}", + partition, + chatRoom); + + chatrooms[partition].put(chatRoomId, chatRoom); + } } Mono getChatRoom(int shard, UUID id) @@ -257,8 +291,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener return Mono.justOrEmpty(chatrooms[shard].get(id)); } - Flux getChatRooms(int shard) + Flux getChatRooms() { - return Flux.fromStream(chatrooms[shard].values().stream()); + return Flux.fromStream(IntStream + .range(0, numShards) + .filter(shard -> isShardOwned[shard]) + .mapToObj(shard -> Integer.valueOf(shard)) + .flatMap(shard -> chatrooms[shard].values().stream())); } }