X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatHomeLoader.java;h=465775f26100fc24c3101658b7458d4193345879;hb=d6dc60cd7765eaac007221d9e0476dfa425064b2;hp=365bb5e2c3964f535f1f537a762b31c5f1e74b27;hpb=ffb82f75797ba93bc61c3de97d90611b21236038;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java index 365bb5e2..465775f2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java @@ -2,9 +2,12 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; import java.time.Instant; import java.time.LocalDateTime; @@ -18,8 +21,10 @@ import java.util.UUID; @Slf4j class ChatHomeLoader { - private final long offsetOfFirstNewMessage; + private final Producer producer; + private final long offsetOfFirstUnseenMessage; private final ZoneId zoneId; + @Getter private final Map kafkaChatRoomServiceMap = new HashMap<>(); @@ -33,10 +38,20 @@ class ChatHomeLoader */ boolean handleMessage(ConsumerRecord record) { - if (record.offset() >= offsetOfFirstNewMessage) + TopicPartition topicPartition = + new TopicPartition(record.topic(), record.partition()); + Message.MessageKey messageKey = Message.MessageKey.of( + record.value().getUser(), + record.value().getId()); + + if (record.offset() >= offsetOfFirstUnseenMessage) { // All messages consumed: DONE! - log.debug("I"); + log.trace( + "Ignoring unseen message {} on {}, offset={}", + messageKey, + topicPartition, + record.offset()); return true; } @@ -45,17 +60,13 @@ class ChatHomeLoader KafkaChatRoomService service = kafkaChatRoomServiceMap .computeIfAbsent(record.key(), key -> - { - }); + new KafkaChatRoomService(producer, topicPartition)); service.addMessage(new Message( - Message.MessageKey.of( - record.value().getUser(), - record.value().getId()), + messageKey, record.offset(), time, - record.value().getText() - )); + record.value().getText())); return false; }