X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatHomeLoader.java;h=15d968a7c7136e5d1e04ea29ac4266a1ad3db1ff;hb=f7475320b20be8ba198ba914958e9e4dddf62e11;hp=365bb5e2c3964f535f1f537a762b31c5f1e74b27;hpb=ffb82f75797ba93bc61c3de97d90611b21236038;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java index 365bb5e2..15d968a7 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java @@ -18,7 +18,7 @@ import java.util.UUID; @Slf4j class ChatHomeLoader { - private final long offsetOfFirstNewMessage; + private final long offsetOfFirstUnseenMessage; private final ZoneId zoneId; private final Map kafkaChatRoomServiceMap = new HashMap<>(); @@ -33,10 +33,19 @@ class ChatHomeLoader */ boolean handleMessage(ConsumerRecord record) { - if (record.offset() >= offsetOfFirstNewMessage) + Message.MessageKey messageKey = Message.MessageKey.of( + record.value().getUser(), + record.value().getId()); + + if (record.offset() >= offsetOfFirstUnseenMessage) { // All messages consumed: DONE! - log.debug("I"); + log.trace( + "Ignoring unseen message {}: topic={}, partition={}, offset={}", + messageKey, + record.topic(), + record.partition(), + record.offset()); return true; } @@ -49,9 +58,7 @@ class ChatHomeLoader }); service.addMessage(new Message( - Message.MessageKey.of( - record.value().getUser(), - record.value().getId()), + messageKey, record.offset(), time, record.value().getText()