From: Kai Moritz Date: Sun, 22 Jan 2023 17:13:40 +0000 (+0100) Subject: feat: First incomplete but compilable stab at an implementation with Kafka X-Git-Tag: rebase--2023-08-18-abends~19 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e2955ae3cbf51f77a45ba3a5254f93ea6e7d6fd8;p=demos%2Fkafka%2Fchat feat: First incomplete but compilable stab at an implementation with Kafka * This approach uses `MessageHandlingStrategy`s to distinguishe between three states of the `ChatRoom`: inactive, loading and active. * But in the course of the implementation, it has shown, that only one strategy (`ChatRoomActiveMessageHandlingStrategy`) is needed. * Hence, this commit represents a turning point in the intended implementation. --- diff --git a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java index 4db77ee2..339451a8 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java +++ b/src/main/java/de/juplo/kafka/chat/backend/api/ChatBackendController.java @@ -70,7 +70,7 @@ public class ChatBackendController .flatMap(chatroom -> put(chatroom, username, messageId, text)); } - public Mono put( + private Mono put( ChatRoom chatroom, String username, Long messageId, diff --git a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java index 1f70f110..714c2207 100644 --- a/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java +++ b/src/main/java/de/juplo/kafka/chat/backend/domain/UnknownChatroomException.java @@ -2,17 +2,43 @@ package de.juplo.kafka.chat.backend.domain; import lombok.Getter; +import java.util.Arrays; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; -public class UnknownChatroomException extends RuntimeException +public class UnknownChatroomException extends IllegalStateException { @Getter private final UUID chatroomId; + @Getter + private final Optional shard; + @Getter + private final Optional ownedShards; public UnknownChatroomException(UUID chatroomId) { super("Chatroom does not exist: " + chatroomId); this.chatroomId = chatroomId; + this.shard = Optional.empty(); + this.ownedShards = Optional.empty(); + } + + public UnknownChatroomException(UUID chatroomId, int shard, int[] ownedShards) + { + super( + "Chatroom does not exist (here): " + + chatroomId + + " shard=" + + shard + + ", owned=" + + Arrays + .stream(ownedShards) + .mapToObj(ownedShard -> Integer.toString(ownedShard)) + .collect(Collectors.joining(","))); + this.chatroomId = chatroomId; + this.shard = Optional.of(shard); + this.ownedShards = Optional.of(ownedShards); } } diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java new file mode 100644 index 00000000..465775f2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatHomeLoader.java @@ -0,0 +1,73 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + + +@RequiredArgsConstructor +@Slf4j +class ChatHomeLoader +{ + private final Producer producer; + private final long offsetOfFirstUnseenMessage; + private final ZoneId zoneId; + @Getter + private final Map kafkaChatRoomServiceMap = new HashMap<>(); + + + /** + * Rebuilds the state of the {@link KafkaChatHomeService} by consuming + * all messages, that belong to the partition, that defines the shard this + * service represents. + * The loader signals, that its work is done, if the given end offset is reached. + * @param record A record, that represents a message from one of the {@link ChatRoom}s, that belong to the partition. + * @return {@code true}, if all messages are consumed. + */ + boolean handleMessage(ConsumerRecord record) + { + TopicPartition topicPartition = + new TopicPartition(record.topic(), record.partition()); + Message.MessageKey messageKey = Message.MessageKey.of( + record.value().getUser(), + record.value().getId()); + + if (record.offset() >= offsetOfFirstUnseenMessage) + { + // All messages consumed: DONE! + log.trace( + "Ignoring unseen message {} on {}, offset={}", + messageKey, + topicPartition, + record.offset()); + return true; + } + + Instant timestamp = Instant.ofEpochMilli(record.timestamp()); + LocalDateTime time = LocalDateTime.ofInstant(timestamp, zoneId); + + KafkaChatRoomService service = kafkaChatRoomServiceMap + .computeIfAbsent(record.key(), key -> + new KafkaChatRoomService(producer, topicPartition)); + + service.addMessage(new Message( + messageKey, + record.offset(), + time, + record.value().getText())); + + return false; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java new file mode 100644 index 00000000..562e2df8 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomActiveMessageHandlingStrategy.java @@ -0,0 +1,69 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.UUID; + + +/** + * TODO: + * Actual the only active strategy! + * {@link MessageHandlingStrategy} probably not needed: Refactor! + */ +@RequiredArgsConstructor +@Slf4j +class ChatRoomActiveMessageHandlingStrategy implements MessageHandlingStrategy +{ + private final KafkaChatRoomService kafkaChatRoomService; + private final Producer producer; + private final TopicPartition tp; + private final UUID chatRoomId; + private final ZoneOffset zoneOffset; + + + @Override + public Mono handleMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + return Mono.create(sink -> + { + ProducerRecord record = + new ProducerRecord<>( + tp.topic(), + tp.partition(), + timestamp.toEpochSecond(zoneOffset), + chatRoomId.toString(), + MessageTo.of(key.getUsername(), key.getMessageId(), text)); + + producer.send(record, ((metadata, exception) -> + { + if (metadata != null) + { + // On successful send + { + // Emit new message + Message message = new Message(key, metadata.offset(), timestamp, text); + kafkaChatRoomService.addMessage(message); + } + + sink.success(); + } + else + { + // On send-failure + sink.error(exception); + } + })); + }); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java new file mode 100644 index 00000000..c7a3c8b3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatRoomLoadingMessageHandlingStrategy.java @@ -0,0 +1,44 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; + + +/** + * TODO: + * Not used anywhere + * {@link ChatRoomActiveMessageHandlingStrategy} is the only active strategy at the moment. + * Refactor? + */ +class ChatRoomLoadingMessageHandlingStrategy implements MessageHandlingStrategy +{ + private final Consumer consumer; + private final TopicPartition tp; + private final long currentOffset; + private final long unseenOffset; + + ChatRoomLoadingMessageHandlingStrategy( + Consumer consumer, + TopicPartition tp, + long currentOffset, + long unseenOffset) + { + this.consumer = consumer; + this.tp = tp; + this.currentOffset = currentOffset; + this.unseenOffset = unseenOffset; + + consumer.seek(tp, unseenOffset); + } + + @Override + public Mono handleMessage(Message.MessageKey key, LocalDateTime timestamp, String text) + { + // TODO + return null; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java new file mode 100644 index 00000000..be0136a0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/ChatroomInactiveMessageHandlingStrategy.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.ShardNotOwnedException; +import lombok.RequiredArgsConstructor; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; + + +@RequiredArgsConstructor +class ChatroomInactiveMessageHandlingStrategy implements MessageHandlingStrategy +{ + private final int shard; + + @Override + public Mono handleMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + throw new ShardNotOwnedException(shard); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java new file mode 100644 index 00000000..eadd7629 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatHomeService.java @@ -0,0 +1,115 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatHomeService; +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.ZoneId; +import java.util.*; + + +@Slf4j +public class KafkaChatHomeService implements ChatHomeService, ConsumerRebalanceListener +{ + private final Consumer consumer; + private final Producer producer; + private final String topic; + private final ZoneId zoneId; + // private final long[] offsets; Erst mal immer alles neu einlesen + private final ChatHomeLoader[] chatHomeLoaders; + private final Map[] chatRoomMaps; + + + public KafkaChatHomeService( + Consumer consumer, + Producer producer, + String topic, + ZoneId zoneId, + int numShards) + { + log.debug("Creating KafkaChatHomeService"); + this.consumer = consumer; + this.producer = producer; + this.topic = topic; + this.zoneId = zoneId; + // this.offsets = new long[numShards]; + // for (int i=0; i< numShards; i++) + // { + // this.offsets[i] = 0l; + // } + this.chatHomeLoaders = new ChatHomeLoader[numShards]; + this.chatRoomMaps = new Map[numShards]; + } + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + consumer.endOffsets(partitions).forEach((topicPartition, currentOffset) -> + { + if (!topicPartition.topic().equals(topic)) + { + log.warn("Ignoring partition from unwanted topic: {}", topicPartition); + return; + } + + int partition = topicPartition.partition(); + long unseenOffset = 0; // offsets[partition]; + + log.info( + "Loading messages from partition {}: start-offset={} -> current-offset={}", + partition, + unseenOffset, + currentOffset); + + // TODO: reuse! Nicht immer alles neu laden, sondern erst ab offsets[partition]! + consumer.seek(topicPartition, unseenOffset); + chatHomeLoaders[partition] = new ChatHomeLoader( + producer, + currentOffset, + zoneId); + }); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(topicPartition -> + { + if (!topicPartition.topic().equals(topic)) + { + log.warn("Ignoring partition from unwanted topic: {}", topicPartition); + return; + } + + int partition = topicPartition.partition(); + // long unseenOffset = offsets[partition]; TODO: Offset merken...? + }); + log.info("Revoked partitions: {}", partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + // TODO: Muss auf den Verlust anders reagiert werden? + onPartitionsRevoked(partitions); + } + + @Override + public Mono getChatRoom(int shard, UUID id) + { + return Mono.justOrEmpty(chatRoomMaps[shard].get(id)); + } + + @Override + public Flux getChatRooms(int shard) + { + return Flux.fromStream(chatRoomMaps[shard].values().stream()); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java new file mode 100644 index 00000000..20d85e80 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomFactory.java @@ -0,0 +1,17 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoom; +import de.juplo.kafka.chat.backend.domain.ChatRoomFactory; +import reactor.core.publisher.Mono; + +import java.util.UUID; + + +public class KafkaChatRoomFactory implements ChatRoomFactory +{ + @Override + public Mono createChatRoom(UUID id, String name) + { + return null; + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java new file mode 100644 index 00000000..794c5c54 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/KafkaChatRoomService.java @@ -0,0 +1,101 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.ChatRoomService; +import de.juplo.kafka.chat.backend.domain.Message; +import de.juplo.kafka.chat.backend.domain.MessageMutationException; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.TopicPartition; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; +import java.util.LinkedHashMap; + + +@Slf4j +public class KafkaChatRoomService implements ChatRoomService +{ + private final Producer producer; + private final TopicPartition tp; + + private final LinkedHashMap messages = new LinkedHashMap<>(); + + private volatile MessageHandlingStrategy strategy; + + + public KafkaChatRoomService( + Producer producer, + TopicPartition tp) + { + this.producer = producer; + this.tp = tp; + this.strategy = new ChatroomInactiveMessageHandlingStrategy(tp.partition()); + } + + + @Override + synchronized public Mono persistMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text) + { + return strategy.handleMessage(key, timestamp, text); + } + + /** + * {@code synchronized} ist nicht nötig, da Aufruf immer indirekt über + * {@link #persistMessage(Message.MessageKey, LocalDateTime, String)} + */ + protected void addMessage(Message message) throws MessageMutationException + { + Message existingMessage = messages.get(message.getKey()); + + // TODO: Ist der Test nötig, oder wird das durch den Kontrollierten Wechsel + // der Strategie ggf. schon abgefangen? Weil: Wenn es nur um die Sorge geht, + // das eine Nachricht schon "durch den Nutzer" anders geschrieben wurde, + // fängt dies bereits der ChatRoom ab. + // Die Überprüfung hier war vor dem Hintergrund der Sorge hinzugefügt worden, + // dass die Nachricht wegen Verschluckern in Kafka / beim Strategiewechsel / ?? + // doppelt aufschlägt... + if (existingMessage == null) + { + messages.put(message.getKey(), message); + } + else + { + if (!existingMessage.getMessageText().equals(message.getMessageText())) + { + throw new MessageMutationException(existingMessage, message.getMessageText()); + } + + // Warn and emit existing message + log.warn( + "Keeping existing message with {}@{} for {}", + existingMessage.getSerialNumber(), + existingMessage.getTimestamp(), + existingMessage.getKey()); + } + } + + @Override + synchronized public Mono getMessage(Message.MessageKey key) + { + // TODO: Aufrufe, auf eine Nachricht (einge gewisse Zeit) warten lassen + // und dann bedienen, wenn der der Callback vom Producer aufgerufen wird? + return Mono.fromSupplier(() -> messages.get(key)); + } + + @Override + synchronized public Flux getMessages(long first, long last) + { + return Flux.fromStream(messages + .values() + .stream() + .filter(message -> + { + long serial = message.getSerialNumber(); + return serial >= first && serial <= last; + })); + } +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java new file mode 100644 index 00000000..1fb4c47d --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageHandlingStrategy.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import reactor.core.publisher.Mono; + +import java.time.LocalDateTime; + + +interface MessageHandlingStrategy +{ + Mono handleMessage( + Message.MessageKey key, + LocalDateTime timestamp, + String text); +} diff --git a/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java new file mode 100644 index 00000000..0a867f16 --- /dev/null +++ b/src/main/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageTo.java @@ -0,0 +1,33 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import de.juplo.kafka.chat.backend.domain.Message; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.time.LocalDateTime; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class MessageTo +{ + private String user; + private Long id; + private String text; + + public Message toMessage(long offset, LocalDateTime timestamp) + { + return new Message(Message.MessageKey.of(user, id), offset, timestamp, text); + } + + public static MessageTo from(Message message) + { + return + new MessageTo( + message.getUsername(), + message.getId(), + message.getMessageText()); + } +} diff --git a/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java new file mode 100644 index 00000000..0c4884bf --- /dev/null +++ b/src/test/java/de/juplo/kafka/chat/backend/persistence/kafka/MessageToTest.java @@ -0,0 +1,39 @@ +package de.juplo.kafka.chat.backend.persistence.kafka; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + + +public class MessageToTest +{ + final String json = """ + { + "id": 1, + "text": "Hallo, ich heiße Peter!", + "user": "Peter" + }"""; + + ObjectMapper mapper; + + @BeforeEach + public void setUp() + { + mapper = new ObjectMapper(); + mapper.registerModule(new JavaTimeModule()); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Test + public void testDeserialization() throws Exception + { + MessageTo message = mapper.readValue(json, MessageTo.class); + assertThat(message.getId()).isEqualTo(1l); + assertThat(message.getText()).isEqualTo("Hallo, ich heiße Peter!"); + assertThat(message.getUser()).isEqualTo("Peter"); + } +}