From: Kai Moritz Date: Sun, 26 Feb 2023 18:13:25 +0000 (+0100) Subject: WIP X-Git-Tag: kafkadata~35 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=d6dc60cd7765eaac007221d9e0476dfa425064b2;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java index 15d968a7..465775f2 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java @@ -2,9 +2,12 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoom; import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; import java.time.Instant; import java.time.LocalDateTime; @@ -18,8 +21,10 @@ import java.util.UUID; @Slf4j class ChatHomeLoader { + private final Producer producer; private final long offsetOfFirstUnseenMessage; private final ZoneId zoneId; + @Getter private final Map kafkaChatRoomServiceMap = new HashMap<>(); @@ -33,6 +38,8 @@ class ChatHomeLoader */ boolean handleMessage(ConsumerRecord record) { + TopicPartition topicPartition = + new TopicPartition(record.topic(), record.partition()); Message.MessageKey messageKey = Message.MessageKey.of( record.value().getUser(), record.value().getId()); @@ -41,10 +48,9 @@ class ChatHomeLoader { // All messages consumed: DONE! log.trace( - "Ignoring unseen message {}: topic={}, partition={}, offset={}", + "Ignoring unseen message {} on {}, offset={}", messageKey, - record.topic(), - record.partition(), + topicPartition, record.offset()); return true; } @@ -54,15 +60,13 @@ class ChatHomeLoader KafkaChatRoomService service = kafkaChatRoomServiceMap .computeIfAbsent(record.key(), key -> - { - }); + new KafkaChatRoomService(producer, topicPartition)); service.addMessage(new Message( messageKey, record.offset(), time, - record.value().getText() - )); + record.value().getText())); return false; } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index c4737a14..37c4e50d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -10,9 +10,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import java.time.LocalDateTime; -import java.time.ZoneOffset; import java.util.LinkedHashMap; -import java.util.UUID; @Slf4j @@ -20,8 +18,6 @@ public class KafkaChatRoomService implements ChatRoomService { private final Producer producer; private final TopicPartition tp; - private final UUID chatRoomId; - private final ZoneOffset zoneOffset; private final LinkedHashMap messages = new LinkedHashMap<>(); @@ -30,14 +26,10 @@ public class KafkaChatRoomService implements ChatRoomService public KafkaChatRoomService( Producer producer, - TopicPartition tp, - UUID chatRoomId, - ZoneOffset zoneOffset) + TopicPartition tp) { this.producer = producer; this.tp = tp; - this.chatRoomId = chatRoomId; - this.zoneOffset = zoneOffset; this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp); }