X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FChatHomeLoader.java;h=465775f26100fc24c3101658b7458d4193345879;hb=d6dc60cd7765eaac007221d9e0476dfa425064b2;hp=15d968a7c7136e5d1e04ea29ac4266a1ad3db1ff;hpb=f7475320b20be8ba198ba914958e9e4dddf62e11;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java index 15d968a7..465775f2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java @@ -2,9 +2,12 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; import java.time.Instant; import java.time.LocalDateTime; @@ -18,8 +21,10 @@ import java.util.UUID; @Slf4j class ChatHomeLoader { + private final Producer producer; private final long offsetOfFirstUnseenMessage; private final ZoneId zoneId; + @Getter private final Map kafkaChatRoomServiceMap = new HashMap<>(); @@ -33,6 +38,8 @@ class ChatHomeLoader */ boolean handleMessage(ConsumerRecord record) { + TopicPartition topicPartition = + new TopicPartition(record.topic(), record.partition()); Message.MessageKey messageKey = Message.MessageKey.of( record.value().getUser(), record.value().getId()); @@ -41,10 +48,9 @@ class ChatHomeLoader { // All messages consumed: DONE! log.trace( - "Ignoring unseen message {}: topic={}, partition={}, offset={}", + "Ignoring unseen message {} on {}, offset={}", messageKey, - record.topic(), - record.partition(), + topicPartition, record.offset()); return true; } @@ -54,15 +60,13 @@ class ChatHomeLoader KafkaChatRoomService service = kafkaChatRoomServiceMap .computeIfAbsent(record.key(), key -> - { - }); + new KafkaChatRoomService(producer, topicPartition)); service.addMessage(new Message( messageKey, record.offset(), time, - record.value().getText() - )); + record.value().getText())); return false; }