From: Kai Moritz Date: Tue, 24 Jan 2023 17:56:36 +0000 (+0100) Subject: WIP X-Git-Tag: wip X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=14968451ee63cd3c97b0721fdb2c9cf3c3fc1646;p=demos%2Fkafka%2Fchat WIP --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java index 024f9aae..981c11f5 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -2,7 +2,11 @@ package de.juplo.kafka.chat.backend.persistence.kafka; import de.juplo.kafka.chat.backend.domain.ChatRoomService; import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -11,17 +15,16 @@ import java.util.LinkedHashMap; @Slf4j +@RequiredArgsConstructor public class KafkaChatRoomService implements ChatRoomService { - private final LinkedHashMap messages; + private final Producer producer; + private final TopicPartition tp; + private final LinkedHashMap messages = new LinkedHashMap<>(); + + private long offset = 0l; - public KafkaChatRoomService(Flux messageFlux) - { - log.debug("Creating KafkaChatRoomService"); - messages = new LinkedHashMap<>(); - messageFlux.subscribe(message -> messages.put(message.getKey(), message)); - } @Override public Message persistMessage( @@ -29,6 +32,8 @@ public class KafkaChatRoomService implements ChatRoomService LocalDateTime timestamp, String text) { + + Mono.error(() -> new MessageMutationException(existing, text))); Message message = new Message(key, (long)messages.size(), timestamp, text); messages.put(message.getKey(), message); return message;