From ba3aba8860f41fcc54970d5e234fee0576768c98 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 22 Jan 2023 18:13:40 +0100 Subject: [PATCH] WIP --- .../kafka/KafkaChatHomeService.java | 182 ++++++++++++++++++ .../kafka/KafkaChatRoomFactory.java | 17 ++ .../kafka/KafkaChatRoomService.java | 60 ++++++ .../backend/persistence/kafka/MessageTo.java | 33 ++++ .../persistence/kafka/MessageToTest.java | 39 ++++ 5 files changed, 331 insertions(+) create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java create mode 100644 src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java create mode 100644 src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java new file mode 100644 index 00000000..4fa567ce --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -0,0 +1,182 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.*; + + +@Slf4j +public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener +{ + private final Consumer consumer; + private final String topic; + private final long[] offsets; + private final MessageHandler[] handlers; + private final Map[] chatrooms; + + + public KafkaChatHomeService( + Consumer consumer, + String topic, + int numShards) + { + log.debug("Creating KafkaChatHomeService"); + this.consumer = consumer; + this.topic = topic; + this.offsets = new long[numShards]; + this.handlers = new MessageHandler[numShards]; + for (int i=0; i< numShards; i++) + { + this.offsets[i] = 0l; + this.handlers[i] = new NoOpMessageHandler(i); + } + this.chatrooms = new Map[numShards]; + } + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + consumer.endOffsets(partitions).forEach((tp, currentOffset) -> + { + if (!tp.topic().equals(topic)) + { + log.warn("Ignoring unwanted TopicPartition", tp); + return; + } + + int partition = tp.partition(); + long unseenOffset = offsets[partition]; + + log.info("Reading partition {} from {} -> {}", partition, unseenOffset, currentOffset); + handlers[partition] = new ChatRoomLoadingMessageHandler(partition, currentOffset, unseenOffset); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + log.info("Revoked partitions: {}", partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + log.info("Revoked partitions: {}", partitions); + } + + private void foo() + { + Set owned = Arrays + .stream(ownedShards) + .collect( + () -> new HashSet<>(), + (set, i) -> set.add(i), + (a, b) -> a.addAll(b)); + for (int shard = 0; shard < numShards; shard++) + { + chatrooms[shard] = owned.contains(shard) + ? new HashMap<>() + : null; + } + chatroomFlux + .filter(chatRoom -> + { + if (owned.contains(chatRoom.getShard())) + { + return true; + } + else + { + log.info("Ignoring not owned chat-room {}", chatRoom); + return false; + } + }) + .toStream() + .forEach(chatroom -> chatrooms[chatroom.getShard()].put(chatroom.getId(), chatroom)); + } + + @Override + public Mono putChatRoom(ChatRoom chatRoom) + { + chatrooms[chatRoom.getShard()].put(chatRoom.getId(), chatRoom); + return Mono.just(chatRoom); + } + + @Override + public Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatrooms[shard].get(id)); + } + + @Override + public Flux getChatRooms(int shard) + { + return Flux.fromStream(chatrooms[shard].values().stream()); + } + + + interface MessageHandler + { + MessageHandler handleMessage(Message message); + } + + + @RequiredArgsConstructor + class NoOpMessageHandler implements MessageHandler + { + private final TopicPartition tp; + + @Override + public MessageHandler handleMessage(Message message) + { + log.warn("Not handling message {} for partition {}", message, tp); + return this; + } + } + + class ChatRoomLoadingMessageHandler implements MessageHandler + { + private final TopicPartition tp; + private final long currentOffset; + private final long unseenOffset; + + ChatRoomLoadingMessageHandler(TopicPartition tp, long currentOffset, long unseenOffset) + { + this.tp = tp; + this.currentOffset = currentOffset; + this.unseenOffset = unseenOffset; + + consumer.seek(tp, unseenOffset); + } + + @Override + public MessageHandler handleMessage(Message message) + { + // todo + return this; + } + } + + @RequiredArgsConstructor + class DefaultMessageHandler implements MessageHandler + { + private final TopicPartition tp; + + @Override + public MessageHandler handleMessage(Message message) + { + chatrooms[tp.partition()].put() + return this; + } + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java new file mode 100644 index 00000000..20d85e80 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +public class KafkaChatRoomFactory implements ChatRoomFactory +{ + @Override + public Mono createChatRoom(UUID id, String name) + { + return null; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..981c11f5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -0,0 +1,60 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +@RequiredArgsConstructor +public class KafkaChatRoomService implements ChatRoomService +{ + private final Producer producer; + private final TopicPartition tp; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + private long offset = 0l; + + + @Override + public Message persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + + Mono.error(() -> new MessageMutationException(existing, text))); + Message message = new Message(key, (long)messages.size(), timestamp, text); + messages.put(message.getKey(), message); + return message; + } + + @Override + public Mono getMessage(Message.MessageKey key) + { + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java new file mode 100644 index 00000000..2de8ad58 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java @@ -0,0 +1,33 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class MessageTo +{ + private Long id; + private String user; + private String text; + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static MessageTo from(Message message) + { + return + new MessageTo( + message.getId(), + message.getUsername(), + message.getMessageText()); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java new file mode 100644 index 00000000..0c4884bf --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class MessageToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + MessageTo message = mapper.readValue(json, MessageTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +} -- 2.20.1