--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatHomeService;
+import de.juplo.kafka.chat.backend.domain.ChatRoom;
+import de.juplo.kafka.chat.backend.domain.Message;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.*;
+
+
+@Slf4j
+public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener
+{
+ private final Consumer<String, MessageTo> consumer;
+ private final String topic;
+ private final long[] offsets;
+ private final MessageHandler[] handlers;
+ private final Map<UUID, ChatRoom>[] chatrooms;
+
+
+ public KafkaChatHomeService(
+ Consumer<String, MessageTo> consumer,
+ String topic,
+ int numShards)
+ {
+ log.debug("Creating KafkaChatHomeService");
+ this.consumer = consumer;
+ this.topic = topic;
+ this.offsets = new long[numShards];
+ this.handlers = new MessageHandler[numShards];
+ for (int i=0; i< numShards; i++)
+ {
+ this.offsets[i] = 0l;
+ this.handlers[i] = new NoOpMessageHandler(i);
+ }
+ this.chatrooms = new Map[numShards];
+ }
+
+
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+ {
+ consumer.endOffsets(partitions).forEach((tp, currentOffset) ->
+ {
+ if (!tp.topic().equals(topic))
+ {
+ log.warn("Ignoring unwanted TopicPartition", tp);
+ return;
+ }
+
+ int partition = tp.partition();
+ long unseenOffset = offsets[partition];
+
+ log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset);
+ handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset);
+ });
+ }
+
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", partitions);
+ }
+
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition> partitions)
+ {
+ log.info("Revoked partitions: {}", partitions);
+ }
+
+ private void foo()
+ {
+ Set<Integer> owned = Arrays
+ .stream(ownedShards)
+ .collect(
+ () -> new HashSet<>(),
+ (set, i) -> set.add(i),
+ (a, b) -> a.addAll(b));
+ for (int shard = 0; shard < numShards; shard++)
+ {
+ chatrooms[shard] = owned.contains(shard)
+ ? new HashMap<>()
+ : null;
+ }
+ chatroomFlux
+ .filter(chatRoom ->
+ {
+ if (owned.contains(chatRoom.getShard()))
+ {
+ return true;
+ }
+ else
+ {
+ log.info("Ignoring not owned chat-room {}", chatRoom);
+ return false;
+ }
+ })
+ .toStream()
+ .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom));
+ }
+
+ @Override
+ public Mono<ChatRoom> putChatRoom(ChatRoom chatRoom)
+ {
+ chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom);
+ return Mono.just(chatRoom);
+ }
+
+ @Override
+ public Mono<ChatRoom> getChatRoom(int shard, UUID id)
+ {
+ return Mono.justOrEmpty(chatrooms[shard].get(id));
+ }
+
+ @Override
+ public Flux<ChatRoom> getChatRooms(int shard)
+ {
+ return Flux.fromStream(chatrooms[shard].values().stream());
+ }
+
+
+ interface MessageHandler
+ {
+ MessageHandler handleMessage(Message message);
+ }
+
+
+ @RequiredArgsConstructor
+ class NoOpMessageHandler implements MessageHandler
+ {
+ private final TopicPartition tp;
+
+ @Override
+ public MessageHandler handleMessage(Message message)
+ {
+ log.warn("Not handling message {} for partition {}", message, tp);
+ return this;
+ }
+ }
+
+ class ChatRoomLoadingMessageHandler implements MessageHandler
+ {
+ private final TopicPartition tp;
+ private final long currentOffset;
+ private final long unseenOffset;
+
+ ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset)
+ {
+ this.tp = tp;
+ this.currentOffset = currentOffset;
+ this.unseenOffset = unseenOffset;
+
+ consumer.seek(tp, unseenOffset);
+ }
+
+ @Override
+ public MessageHandler handleMessage(Message message)
+ {
+ // todo
+ return this;
+ }
+ }
+
+ @RequiredArgsConstructor
+ class DefaultMessageHandler implements MessageHandler
+ {
+ private final TopicPartition tp;
+
+ @Override
+ public MessageHandler handleMessage(Message message)
+ {
+ chatrooms[tp.partition()].put()
+ return this;
+ }
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import de.juplo.kafka.chat.backend.domain.ChatRoomService;
+import de.juplo.kafka.chat.backend.domain.Message;
+import de.juplo.kafka.chat.backend.domain.MessageMutationException;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.TopicPartition;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.LocalDateTime;
+import java.util.LinkedHashMap;
+
+
+@Slf4j
+@RequiredArgsConstructor
+public class KafkaChatRoomService implements ChatRoomService
+{
+ private final Producer<String, MessageTo> producer;
+ private final TopicPartition tp;
+
+ private final LinkedHashMap<Message.MessageKey, Message> messages = new LinkedHashMap<>();
+
+ private long offset = 0l;
+
+
+ @Override
+ public Message persistMessage(
+ Message.MessageKey key,
+ LocalDateTime timestamp,
+ String text)
+ {
+
+ Mono.error(() -> new MessageMutationException(existing, text)));
+ Message message = new Message(key, (long)messages.size(), timestamp, text);
+ messages.put(message.getKey(), message);
+ return message;
+ }
+
+ @Override
+ public Mono<Message> getMessage(Message.MessageKey key)
+ {
+ return Mono.fromSupplier(() -> messages.get(key));
+ }
+
+ @Override
+ public Flux<Message> getMessages(long first, long last)
+ {
+ return Flux.fromStream(messages
+ .values()
+ .stream()
+ .filter(message ->
+ {
+ long serial = message.getSerialNumber();
+ return serial >= first && serial <= last;
+ }));
+ }
+}
--- /dev/null
+package de.juplo.kafka.chat.backend.persistence.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
+
+
+public class MessageToTest
+{
+ final String json = """
+ {
+ "id": 1,
+ "text": "Hallo, ich heiße Peter!",
+ "user": "Peter"
+ }""";
+
+ ObjectMapper mapper;
+
+ @BeforeEach
+ public void setUp()
+ {
+ mapper = new ObjectMapper();
+ mapper.registerModule(new JavaTimeModule());
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ }
+
+ @Test
+ public void testDeserialization() throws Exception
+ {
+ MessageTo message = mapper.readValue(json, MessageTo.class);
+ assertThat(message.getId()).isEqualTo(1l);
+ assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!");
+ assertThat(message.getUser()).isEqualTo("Peter");
+ }
+}