@Slf4j
class ChatHomeLoader
{
- private final long offsetOfFirstNewMessage;
+ private final long offsetOfFirstUnseenMessage;
private final ZoneId zoneId;
private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
*/
boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
{
- if (record.offset() >= offsetOfFirstNewMessage)
+ Message.MessageKey messageKey = Message.MessageKey.of(
+ record.value().getUser(),
+ record.value().getId());
+
+ if (record.offset() >= offsetOfFirstUnseenMessage)
{
// All messages consumed: DONE!
- log.debug("I");
+ log.trace(
+ "Ignoring unseen message {}: topic={}, partition={}, offset={}",
+ messageKey,
+ record.topic(),
+ record.partition(),
+ record.offset());
return true;
}
});
service.addMessage(new Message(
- Message.MessageKey.of(
- record.value().getUser(),
- record.value().getId()),
+ messageKey,
record.offset(),
time,
record.value().getText()