From: Kai Moritz Date: Sun, 26 Feb 2023 14:13:55 +0000 (+0100) Subject: WIP X-Git-Tag: kafkadata~41 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=a5a45a093e26442e6413c33d541fee51834e242c;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java index 430d6ef8..892da90d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.Message; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; @RequiredArgsConstructor @@ -11,9 +14,11 @@ class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy private final TopicPartition tp; @Override - public MessageHandlingStrategy handleMessage(Message message) + public Mono handleMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) { - KafkaChatHomeService.log.warn("Not handling message {} for partition {}", message, tp); - return this; + throw new } }