.flatMap(chatroom -> put(chatroom, username, messageId, text));
}
- public Mono<MessageTo> put(
+ private Mono<MessageTo> put(
ChatRoom chatroom,
String username,
Long messageId,
import lombok.Getter;
+import java.util.Arrays;
+import java.util.Optional;
import java.util.UUID;
+import java.util.stream.Collectors;
-public class UnknownChatroomException extends RuntimeException
+public class UnknownChatroomException extends IllegalStateException
{
@Getter
private final UUID chatroomId;
+ @Getter
+ private final Optional<Integer> shard;
+ @Getter
+ private final Optional<int[]> ownedShards;
public UnknownChatroomException(UUID chatroomId)
{
super("Chatroom does not exist: " + chatroomId);
this.chatroomId = chatroomId;
+ this.shard = Optional.empty();
+ this.ownedShards = Optional.empty();
+ }
+
+ public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards)
+ {
+ super(
+ "Chatroom does not exist (here): " +
+ chatroomId +
+ " shard=" +
+ shard +
+ ", owned=" +
+ Arrays
+ .stream(ownedShards)
+ .mapToObj(ownedShard -> Integer.toString(ownedShard))
+ .collect(Collectors.joining(",")));
+ this.chatroomId = chatroomId;
+ this.shard = Optional.of(shard);
+ this.ownedShards = Optional.of(ownedShards);
}
}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+
+@RequiredArgsConstructor
+@Slf4j
+class ChatHomeLoader
+{
+ private final Producer<String, MessageTo> producer;
+ private final long offsetOfFirstUnseenMessage;
+ private final ZoneId zoneId;
+ @Getter
+ private final Map<UUID, KafkaChatRoomService> kafkaChatRoomServiceMap = new HashMap<>();
+
+
+ /**
+ * Rebuilds the state of the {@link KafkaChatHomeService} by consuming
+ * all messages, that belong to the partition, that defines the shard this
+ * service represents.
+ * The loader signals, that its work is done, if the given end offset is reached.
+ * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition.
+ * @return {@code true}, if all messages are consumed.
+ */
+ boolean handleMessage(ConsumerRecord<UUID, MessageTo> record)
+ {
+ TopicPartition topicPartition =
+ new TopicPartition(record.topic(), record.partition());
+ Message.MessageKey messageKey = Message.MessageKey.of(
+ record.value().getUser(),
+ record.value().getId());
+
+ if (record.offset() >= offsetOfFirstUnseenMessage)
+ {
+ // All messages consumed: DONE!
+ log.trace(
+ "Ignoring unseen message {} on {}, offset={}",
+ messageKey,
+ topicPartition,
+ record.offset());
+ return true;
+ }
+
+ Instant timestamp = Instant.ofEpochMilli(record.timestamp());
+ LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId);
+
+ KafkaChatRoomService service = kafkaChatRoomServiceMap
+ .computeIfAbsent(record.key(), key ->
+ new KafkaChatRoomService(producer, topicPartition));
+
+ service.addMessage(new Message(
+ messageKey,
+ record.offset(),
+ time,
+ record.value().getText()));
+
+ return false;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.UUID;
+
+
+/**
+ * TODO:
+ * Actual the only active strategy!
+ * {@link MessageHandlingStrategy} probably not needed: Refactor!
+ */
+@RequiredArgsConstructor
+@Slf4j
+class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy
+{
+ private final KafkaChatRoomService kafkaChatRoomService;
+ private final Producer<String, MessageTo> producer;
+ private final TopicPartition tp;
+ private final UUID chatRoomId;
+ private final ZoneOffset zoneOffset;
+
+
+ @Override
+ public Mono<Message> handleMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ return Mono.create(sink ->
+ {
+ ProducerRecord<String, MessageTo> record =
+ new ProducerRecord<>(
+ tp.topic(),
+ tp.partition(),
+ timestamp.toEpochSecond(zoneOffset),
+ chatRoomId.toString(),
+ MessageTo.of(key.getUsername(), key.getMessageId(), text));
+
+ producer.send(record, ((metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ // On successful send
+ {
+ // Emit new message
+ Message message = new Message(key, metadata.offset(), timestamp, text);
+ kafkaChatRoomService.addMessage(message);
+ }
+
+ sink.success();
+ }
+ else
+ {
+ // On send-failure
+ sink.error(exception);
+ }
+ }));
+ });
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+/**
+ * TODO:
+ * Not used anywhere
+ * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment.
+ * Refactor?
+ */
+class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy
+{
+ private final Consumer consumer;
+ private final TopicPartition tp;
+ private final long currentOffset;
+ private final long unseenOffset;
+
+ ChatRoomLoadingMessageHandlingStrategy(
+ Consumer consumer,
+ TopicPartition tp,
+ long currentOffset,
+ long unseenOffset)
+ {
+ this.consumer = consumer;
+ this.tp = tp;
+ this.currentOffset = currentOffset;
+ this.unseenOffset = unseenOffset;
+
+ consumer.seek(tp, unseenOffset);
+ }
+
+ @Override
+ public Mono<Message> handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text)
+ {
+ // TODO
+ return null;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException;
+import lombok.RequiredArgsConstructor;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+@RequiredArgsConstructor
+class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy
+{
+ private final int shard;
+
+ @Override
+ public Mono<Message> handleMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ throw new ShardNotOwnedException(shard);
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.ZoneId;
+import java.util.*;
+
+
+@Slf4j
+public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
+{
+ private final Consumer<String, MessageTo> consumer;
+ private final Producer<String, MessageTo> producer;
+ private final String topic;
+ private final ZoneId zoneId;
+ // private final long[] offsets; Erst mal immer alles neu einlesen
+ private final ChatHomeLoader[] chatHomeLoaders;
+ private final Map<UUID, ChatRoom>[] chatRoomMaps;
+
+
+ public KafkaChatHomeService(
+ Consumer<String, MessageTo> consumer,
+ Producer<String, MessageTo> producer,
+ String topic,
+ ZoneId zoneId,
+ int numShards)
+ {
+ log.debug("Creating KafkaChatHomeService");
+ this.consumer = consumer;
+ this.producer = producer;
+ this.topic = topic;
+ this.zoneId = zoneId;
+ // this.offsets = new long[numShards];
+ // for (int i=0; i< numShards; i++)
+ // {
+ // this.offsets[i] = 0l;
+ // }
+ this.chatHomeLoaders = new ChatHomeLoader[numShards];
+ this.chatRoomMaps = new Map[numShards];
+ }
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) ->
+ {
+ if (!topicPartition.topic().equals(topic))
+ {
+ log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
+ return;
+ }
+
+ int partition = topicPartition.partition();
+ long unseenOffset = 0; // offsets[partition];
+
+ log.info(
+ "Loading messages from partition {}: start-offset={} -> current-offset={}",
+ partition,
+ unseenOffset,
+ currentOffset);
+
+ // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]!
+ consumer.seek(topicPartition, unseenOffset);
+ chatHomeLoaders[partition] = new ChatHomeLoader(
+ producer,
+ currentOffset,
+ zoneId);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ partitions.forEach(topicPartition ->
+ {
+ if (!topicPartition.topic().equals(topic))
+ {
+ log.warn("Ignoring partition from unwanted topic: {}", topicPartition);
+ return;
+ }
+
+ int partition = topicPartition.partition();
+ // long unseenOffset = offsets[partition]; TODO: Offset merken...?
+ });
+ log.info("Revoked partitions: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ // TODO: Muss auf den Verlust anders reagiert werden?
+ onPartitionsRevoked(partitions);
+ }
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ {
+ return Mono.justOrEmpty(chatRoomMaps[shard].get(id));
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms(int shard)
+ {
+ return Flux.fromStream(chatRoomMaps[shard].values().stream());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.ChatRoomFactory;
+import reactor.core.publisher.Mono;
+
+import java.util.UUID;
+
+
+public class KafkaChatRoomFactory implements ChatRoomFactory
+{
+ @Override
+ public Mono<ChatRoom> createChatRoom(UUID id, String name)
+ {
+ return null;
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+public class KafkaChatRoomService implements ChatRoomService
+{
+ private final Producer<String, MessageTo> producer;
+ private final TopicPartition tp;
+
+ private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+ private volatile MessageHandlingStrategy strategy;
+
+
+ public KafkaChatRoomService(
+ Producer<String, MessageTo> producer,
+ TopicPartition tp)
+ {
+ this.producer = producer;
+ this.tp = tp;
+ this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp.partition());
+ }
+
+
+ @Override
+ synchronized public Mono<Message> persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+ return strategy.handleMessage(key, timestamp, text);
+ }
+
+ /**
+ * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über
+ * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)}
+ */
+ protected void addMessage(Message message) throws MessageMutationException
+ {
+ Message existingMessage = messages.get(message.getKey());
+
+ // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel
+ // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht,
+ // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde,
+ // fängt dies bereits der ChatRoom ab.
+ // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden,
+ // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ??
+ // doppelt aufschlägt...
+ if (existingMessage == null)
+ {
+ messages.put(message.getKey(), message);
+ }
+ else
+ {
+ if (!existingMessage.getMessageText().equals(message.getMessageText()))
+ {
+ throw new MessageMutationException(existingMessage, message.getMessageText());
+ }
+
+ // Warn and emit existing message
+ log.warn(
+ "Keeping existing message with {}@{} for {}",
+ existingMessage.getSerialNumber(),
+ existingMessage.getTimestamp(),
+ existingMessage.getKey());
+ }
+ }
+
+ @Override
+ synchronized public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen
+ // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird?
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ synchronized public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+
+
+interface MessageHandlingStrategy
+{
+ Mono<Message> handleMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text);
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.LocalDateTime;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class MessageTo
+{
+ private String user;
+ private Long id;
+ private String text;
+
+ public Message toMessage(long offset, LocalDateTime timestamp)
+ {
+ return new Message(Message.MessageKey.of(user, id), offset, timestamp, text);
+ }
+
+ public static MessageTo from(Message message)
+ {
+ return
+ new MessageTo(
+ message.getUsername(),
+ message.getId(),
+ message.getMessageText());
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class MessageToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ MessageTo message = mapper.readValue(json, MessageTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}