X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatRoomService.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fchat%2Fbackend%2Fpersistence%2Fkafka%2FKafkaChatRoomService.java;h=981c11f527afad087386ab54f58bfedbfa819952;hb=9c24aa2aadc3f9ae98362ccb86ab179734362800;hp=0000000000000000000000000000000000000000;hpb=e1ad66ecb1dc386bb357e364a05b071ec45920e1;p=demos%2Fkafka%2Fchat diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..981c11f5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -0,0 +1,60 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +@RequiredArgsConstructor +public class KafkaChatRoomService implements ChatRoomService +{ + private final Producer producer; + private final TopicPartition tp; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + private long offset = 0l; + + + @Override + public Message persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + + Mono.error(() -> new MessageMutationException(existing, text))); + Message message = new Message(key, (long)messages.size(), timestamp, text); + messages.put(message.getKey(), message); + return message; + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +}