import reactor.core.publisher.Mono;
import java.time.LocalDateTime;
+import java.util.UUID;
public interface ChatMessageService
{
+ UUID getChatRoomId();
+
Mono<Message> persistMessage(
Message.MessageKey key,
LocalDateTime timestamp,
return service.getMessages(first, last);
}
+ public void close()
+ {
+ log.info("{} is being closed", service.getChatRoomId());
+ sink.emitComplete(Sinks.EmitFailureHandler.FAIL_FAST);
+ }
+
private Sinks.Many<Message> createSink()
{
return Sinks
import de.juplo.kafka.chat.backend.domain.ChatMessageService;
import de.juplo.kafka.chat.backend.domain.Message;
import de.juplo.kafka.chat.backend.implementation.StorageStrategy;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@Slf4j
public class InMemoryChatMessageService implements ChatMessageService
{
+ @Getter
private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages;
partitions.forEach(topicPartition ->
{
int partition = topicPartition.partition();
+ chatRoomData[partition]
+ .values()
+ .forEach(chatRoomData -> chatRoomData.close());
isShardOwned[partition] = false;
nextOffset[partition] = consumer.position(topicPartition);
log.info("Partition revoked: {} - next={}", partition, nextOffset[partition]);
package de.juplo.kafka.chat.backend.implementation.kafka;
import de.juplo.kafka.chat.backend.domain.ChatMessageService;
-import de.juplo.kafka.chat.backend.domain.Message;import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class KafkaChatMessageService implements ChatMessageService
{
private final DataChannel dataChannel;
+ @Getter
private final UUID chatRoomId;
private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();