X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatMessageChannel.java;h=8294316f497157fd0067054e45bfb9a9bb620595;hb=1416ccc8a9eae999201dbf7c77c4d4906fc9fc24;hp=138d9a7b706bfab4c01f2933e9502109a1b483ef;hpb=5d366ac288990782c88927a4ff967b3f28fdec19;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java index 138d9a7b..8294316f 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatMessageChannel.java @@ -25,8 +25,8 @@ import java.util.stream.IntStream; public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { private final String topic; - private final Producer producer; - private final Consumer consumer; + private final Producer producer; + private final Consumer consumer; private final ZoneId zoneId; private final int numShards; private final boolean[] isShardOwned; @@ -42,8 +42,8 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener public ChatMessageChannel( String topic, - Producer producer, - Consumer consumer, + Producer producer, + Consumer consumer, ZoneId zoneId, int numShards) { @@ -78,13 +78,13 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener ZonedDateTime zdt = ZonedDateTime.of(timestamp, zoneId); return Mono.create(sink -> { - ProducerRecord record = + ProducerRecord record = new ProducerRecord<>( tp.topic(), tp.partition(), zdt.toEpochSecond(), chatRoomId.toString(), - MessageTo.of(key.getUsername(), key.getMessageId(), text)); + ChatMessageTo.of(key.getUsername(), key.getMessageId(), text)); producer.send(record, ((metadata, exception) -> { @@ -165,12 +165,12 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener { try { - ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); + ConsumerRecords records = consumer.poll(Duration.ofMinutes(5)); log.info("Fetched {} messages", records.count()); if (loadInProgress) { - loadMessages(records); + loadChatRoom(records); if (isLoadingCompleted()) { @@ -198,31 +198,55 @@ public class ChatMessageChannel implements Runnable, ConsumerRebalanceListener log.info("Exiting normally"); } - void loadMessages(ConsumerRecords records) + void loadChatRoom(ConsumerRecords records) { - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { - nextOffset[record.partition()] = record.offset() + 1; - UUID chatRoomId = UUID.fromString(record.key()); - MessageTo messageTo = record.value(); + switch (record.value().getType()) + { + case CREATE_CHATROOM_REQUEST: + createChatRoom((CreateChatRoomRequestTo) record.value()); + break; + + case MESSAGE_SENT: + UUID chatRoomId = UUID.fromString(record.key()); + Instant instant = Instant.ofEpochSecond(record.timestamp()); + LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + loadChatMessage( + chatRoomId, + timestamp, + record.offset(), + (ChatMessageTo) record.value(), + record.partition()); + break; + } - Message.MessageKey key = Message.MessageKey.of(messageTo.getUser(), messageTo.getId()); + nextOffset[record.partition()] = record.offset() + 1; + } + } - Instant instant = Instant.ofEpochSecond(record.timestamp()); - LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId); + void createChatRoom( + CreateChatRoomRequestTo createChatRoomRequestTo, + int partition) + { + chatrooms[partition].put + } - Message message = new Message(key, record.offset(), timestamp, messageTo.getText()); + void loadChatMessage( + UUID chatRoomId, + LocalDateTime timestamp, + long offset, + ChatMessageTo chatMessageTo, + int partition) + { + Message.MessageKey key = Message.MessageKey.of(chatMessageTo.getUser(), chatMessageTo.getId()); + Message message = new Message(key, offset, timestamp, chatMessageTo.getText()); - ChatRoom chatRoom = chatrooms[record.partition()].get(chatRoomId); - if (chatRoom == null) - { - // TODO: Alles pausieren und erst von putChatRoom wieder resumen lassen! - } - KafkaChatRoomService kafkaChatRoomService = - (KafkaChatRoomService) chatRoom.getChatRoomService(); + ChatRoom chatRoom = chatrooms[partition].get(chatRoomId); + KafkaChatRoomService kafkaChatRoomService = + (KafkaChatRoomService) chatRoom.getChatRoomService(); - kafkaChatRoomService.persistMessage(message); - } + kafkaChatRoomService.persistMessage(message); } boolean isLoadingCompleted()