+ private void handleMessage(ConsumerRecord<String, AbstractMessageTo> record)
+ {
+ switch (record.value().getType())
+ {
+ case EVENT_CHATROOM_CREATED:
+ EventChatRoomCreated eventChatRoomCreated =
+ (EventChatRoomCreated) record.value();
+ createChatRoom(eventChatRoomCreated.toChatRoomInfo());
+ break;
+
+ case EVENT_SHARD_ASSIGNED:
+ EventShardAssigned eventShardAssigned =
+ (EventShardAssigned) record.value();
+ log.info(
+ "Shard {} was assigned to {}",
+ eventShardAssigned.getShard(),
+ eventShardAssigned.getUri());
+ shardOwners[eventShardAssigned.getShard()] = eventShardAssigned.getUri();
+ break;
+
+ case EVENT_SHARD_REVOKED:
+ EventShardRevoked eventShardRevoked =
+ (EventShardRevoked) record.value();
+ log.info(
+ "Shard {} was revoked from {}",
+ eventShardRevoked.getShard(),
+ eventShardRevoked.getUri());
+ shardOwners[eventShardRevoked.getShard()] = null;
+ break;
+
+ default:
+ log.debug(
+ "Ignoring message for key={} with offset={}: {}",
+ record.key(),
+ record.offset(),
+ record.value());