- nextOffset[record.partition()] = record.offset() + 1;
- UUID chatRoomId = UUID.fromString(record.key());
- MessageTo messageTo = record.value();
+ switch (record.value().getType())
+ {
+ case CREATE_CHATROOM_REQUEST:
+ createChatRoom((CreateChatRoomRequestTo) record.value());
+ break;
+
+ case MESSAGE_SENT:
+ UUID chatRoomId = UUID.fromString(record.key());
+ Instant instant = Instant.ofEpochSecond(record.timestamp());
+ LocalDateTime timestamp = LocalDateTime.ofInstant(instant, zoneId);
+ loadChatMessage(
+ chatRoomId,
+ timestamp,
+ record.offset(),
+ (ChatMessageTo) record.value(),
+ record.partition());
+ break;
+ }