From a5a45a093e26442e6413c33d541fee51834e242c Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 26 Feb 2023 15:13:55 +0100 Subject: [PATCH] WIP --- .../ChatroomInactiveMessageHandlingStrategy.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java index 430d6ef8..892da90d 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java @@ -3,6 +3,9 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.Message; import lombok.RequiredArgsConstructor; import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; @RequiredArgsConstructor @@ -11,9 +14,11 @@ class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy private final TopicPartition tp; @Override - public MessageHandlingStrategy handleMessage(Message message) + public Mono handleMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) { - KafkaChatHomeService.log.warn("Not handling message {} for partition {}", message, tp); - return this; + throw new } } -- 2.20.1